-
-
Notifications
You must be signed in to change notification settings - Fork 313
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix memory leak in dramatiq.Worker:process_message #351
Conversation
The memory leak takes place only when an exception is thrown. The problem has been traced down to the exception handler of `process_message` and more specifically to how the exception object is handled and stored in order to be referred back to down the road, eg. for debugging purposes or by the results' middleware. When `message.stuff_exception(e)` is called, a cyclic reference is created, which causes objects to not be garbage-collected (at least not until the cyclic GC kicks in). In general, a cyclic reference is created when the exception object is stored in a variable x, which is part of the stack frame. The exception object references the stack frame, thus the variable x, via its `__traceback__` attribute. But now x references the exception object, too, thus the stack frame. So, a cyclic reference is created! In this particular case, the reference cycle created is: message -> message._exception -> e -> stackframe -> message In general, it is not recommended to store exception objects in local vars. However, if that's necessary, such variables should always be (explicitly) cleared in order to break the cyclic reference - usually in a try/finally statement. In that sense, it comes in handy that a try/finally statement is alread being used. By seting `message._exception` to `None` at the very end the reference cycle is broken and objects are garbage-collected soon enough.
I see that the tests fail due to the I haven't fixed it yet in case you have a specific fix in mind. A possible fix could be something like:
diff --git a/dramatiq/brokers/stub.py b/dramatiq/brokers/stub.py
index 77317c9..8562998 100644
--- a/dramatiq/brokers/stub.py
+++ b/dramatiq/brokers/stub.py
@@ -170,11 +170,16 @@ class StubBroker(Broker):
else:
if fail_fast:
for message in self.dead_letters_by_queue[queue_name]:
- raise message._exception from None
+ raise StubBrokerException(message.serialized_exception['msg'])
return
+class StubBrokerException(Exception):
+ """An exception specifically raised in unit tests."""
+ pass
+
+
class _StubConsumer(Consumer):
def __init__(self, queue, dead_letters, timeout):
self.queue = queue This way you also avoid accessing the private Again, the above is merely a quick suggestion that came to mind :) I haven't really given much thought to it. |
Thanks for digging into this and finding the problem! I'd like to preserve the current behavior and I wonder if adding a def __del__(self):
self._exception = None to |
I'm afraid that's not going to work. The problem here is that |
The garbage collector detects cycles such as these and correctly calls I do agree that it would be ideal to break the cycle if possible, but it might not be possible to do that and preserve the current functionality (I haven't had time to think things through yet) and my preference is to put in a hack rather than break backwads-compatibility. |
The cyclic garbage collector detects cycles such as these - yes. Yet, I'm not utterly familiar with its intricacies. The cyclic garbage collector runs periodically, attempts to detect cyclic references, and garbage-collect the respective objects. The cyclic garbage collector works off object generations. If it fails to garbage-collect an object due to a reference cycle, it will advance the object to the next generation pool (a total of 3 pools). When an object is moved to a higher generation pool, the frequency by which the cyclic garbage collector attempts to collect that object decreases I believe. The point here is that the cyclic garbage collector may still be able to release the objects in question at some point in the future without any changes. By the way, by calling However, just by adding a So, to sum this up:
Even if (4) works, I'm not entirely sure if it will work as well as On another note - how badly are we going to break compatibility? Is there any other place besides the P.S. I even tested just adding a |
Thanks for trying out my suggestion. I'm surprised it didn't help and I'll try to find some time this weekend to investigate things myself. The point of the
The |
@Bogdanp can you please explain why you think a |
@dimrozakis take a look at PEP422, in particular the section titled Disposal of cyclic isolates. |
That makes sense. I agree.
I'm not utterly familiar with this TBH, yet I find it hard to believe that the generation pool an object is in isn't relevant. What's the guarantee that the GC will (successfully) collect older generations' objects, or even run for older (or more specifically the oldest) generation at all, before requesting extra memory from the OS? If there's no guarantee, couldn't one argue that that's what a memory leak is actually about?
Agreed. That makes sense based on PEP442. Didn't mean to imply the opposite.
Regarding the Results middleware - aren't all middlewares run sequentially in the same thread after a task has finished running? If so, the
I see your point. Now, regarding PEP442 you mentioned - I don't see how that's helpful TBH. The PEP talks about object finalization in reference cycles. That implies that the reference cycle has already been identified, no? If so, it is garbage-collect-able, isn't it? Additionally, in our case the |
I've refreshed my memory about this and I did have it wrong. It does seem like it would be easy to have memory bloat issues if you allocate very large objects infrequently due to the way the threshold stats work.
Yep, that's right so this is only a problem for the
In my rush to reply I kind of lost track of the original point of the discussion. I'd originally offered up the finalizer as a way to break the cycle, but that's not necessary since the GC will detect that the graph is unreachable and release those objects. Sorry for the confusion! I only have so much time to deal with these things during the week. I'll push a fix for this today. |
This should now be fixed on master. Thanks again for finding the issue and the discussion. |
The memory leak takes place only when an exception is thrown.
The problem has been traced down to the exception handler of
process_message
and more specifically to how the exceptionobject is handled and stored in order to be referred back to
down the road, eg. for debugging purposes or by the results'
middleware. When
message.stuff_exception(e)
is called, acyclic reference is created, which causes objects to not be
garbage-collected (at least not until the cyclic GC kicks in).
In general, a cyclic reference is created when the exception
object is stored in a variable x, which is part of the stack
frame. The exception object references the stack frame, thus
the variable x, via its
__traceback__
attribute. But now xreferences the exception object, too, thus the stack frame.
So, a cyclic reference is created!
In this particular case, the reference cycle created is:
message -> message._exception -> e -> stackframe -> message
In general, it is not recommended to store exception objects
in local vars. However, if that's necessary, such variables
should always be (explicitly) cleared in order to break the
cyclic reference - usually in a try/finally statement.
In that sense, it comes in handy that a try/finally statement
is already being used. By setting
message._exception
toNone
at the very end the reference cycle is broken and objects are
garbage-collected soon enough.
The following piece of code can be used to re-produce the problem:
After running
for _ in range(5): foo.send()
the memory leak is evident:When the fix is applied: