Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exception when worker gets Pandas DataFrame as argument #631

Open
brakhane opened this issue Jun 22, 2024 · 1 comment
Open

Exception when worker gets Pandas DataFrame as argument #631

brakhane opened this issue Jun 22, 2024 · 1 comment

Comments

@brakhane
Copy link

brakhane commented Jun 22, 2024

What OS are you using?

Windows 10

What version of Dramatiq are you using?

Version: 1.17.0

What did you do?

I ran into an issue where a dramatiq worker takes a DataFrame to do some processing, and when more than one message is in the queue, the broker raises an exception because MessageProxy's __eq__ is comparing two messages containing the dataframe argument using == which Pandas doesn't like.

Reproducable example:

import pandas as pd
import dramatiq

dramatiq.set_encoder(dramatiq.PickleEncoder()) 

@dramatiq.actor
def buggy(df: pd.DataFrame):
    pass

if __name__ == "__main__":
    for _ in range(10):  # the bug does not appear if only one message is sent
        buggy.send(pd.DataFrame([[1,2,3],[4,5,6]], [1,2]))

Starting workers with dramatiq -t 1 -p 2 bug (to avoid out of memory issues) and the main program with python bug.py results in the following exception

[2024-06-22 23:50:51,477] [PID 132100] [Thread-2] [dramatiq.worker.ConsumerThread(default)] [CRITICAL] Consumer encountered an unexpected error.
Traceback (most recent call last):
  File "C:\Users\Dennis\AppData\Roaming\Python\Python311\site-packages\dramatiq\worker.py", line 267, in run
    self.handle_message(message)
  File "C:\Users\Dennis\AppData\Roaming\Python\Python311\site-packages\dramatiq\worker.py", line 328, in handle_message
    self.work_queue.put((actor.priority, message))
  File "C:\Users\Dennis\AppData\Local\Programs\Python\Python311\Lib\queue.py", line 150, in put
    self._put(item)
  File "C:\Users\Dennis\AppData\Local\Programs\Python\Python311\Lib\queue.py", line 236, in _put
    heappush(self.queue, item)
  File "C:\Users\Dennis\AppData\Roaming\Python\Python311\site-packages\dramatiq\broker.py", line 394, in __eq__
    return self._message == other._message
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<string>", line 4, in __eq__
  File "C:\Users\Dennis\AppData\Roaming\Python\Python311\site-packages\pandas\core\generic.py", line 1527, in __nonzero__
    raise ValueError(
ValueError: The truth value of a DataFrame is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all().
[2024-06-22 23:50:51,480] [PID 132100] [Thread-2] [dramatiq.worker.ConsumerThread(default)] [INFO] Restarting consumer in 3.00 seconds.

Immediately followed by

Exception in thread Thread-4:
Traceback (most recent call last):
  File "C:\Users\Dennis\AppData\Local\Programs\Python\Python311\Lib\threading.py", line 1038, in _bootstrap_inner
    self.run()
  File "C:\Users\Dennis\AppData\Roaming\Python\Python311\site-packages\dramatiq\worker.py", line 462, in run
    self.process_message(message)
  File "C:\Users\Dennis\AppData\Roaming\Python\Python311\site-packages\dramatiq\worker.py", line 525, in process_message
    self.work_queue.task_done()
  File "C:\Users\Dennis\AppData\Local\Programs\Python\Python311\Lib\queue.py", line 75, in task_done
    raise ValueError('task_done() called too many times')
ValueError: task_done() called too many times

What did you expect would happen?

I expected dramatiq to not raise an exception

@wsturgiss
Copy link

Not really solving your issue, but something we did that might work as a workaround is that we don't pass Dataframes directly to Dramatiq (partially because my use case has massive Dataframes), but persist what we need to the DB, then only pass an id for a row and fetch it within the worker.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants