Skip to content

fix: handle decode errors #375

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 20, 2021

Conversation

thomazthz
Copy link
Contributor

I noticed that the worker consumer thread hangs indefinitely when an exception is raised from the message decoder. The consumer does not know how to handle it because the message has not been created yet.

So it enters in the loop:

  1. Tries to read a message
  2. Decode fails
  3. Consumer thread breaks and closes the broker's connection
  4. Message is requeued
  5. Consumer thread wake up and tries to read the same message again

I tried to follow the suggested approach presented in #284, but I couldn't make it work.

I think the encode side is safe because we can handle exceptions on the publisher side.

--

Note that the PR changes only applies to RabbitMQ broker. For Redis, I used the JSONEncoder as a fallback to access redis_message_id and rejects the message since it is not available after the exception occurs. But I don't know if it's the right approach.

diff --git a/dramatiq/brokers/redis.py b/dramatiq/brokers/redis.py
index 2aad9b2..9808319 100644
--- a/dramatiq/brokers/redis.py
+++ b/dramatiq/brokers/redis.py
@@ -27,7 +27,8 @@ import redis

 from ..broker import Broker, Consumer, MessageProxy
 from ..common import compute_backoff, current_millis, dq_name
-from ..errors import ConnectionClosed, QueueJoinTimeout
+from ..errors import ConnectionClosed, DecodeError, QueueJoinTimeout
+from ..encoder import JSONEncoder
 from ..logging import get_logger
 from ..message import Message

@@ -360,6 +361,11 @@ class _RedisConsumer(Consumer):
                         self.misses, backoff_ms = compute_backoff(self.misses, max_backoff=self.timeout)
                         time.sleep(backoff_ms / 1000)
                         return None
+                except DecodeError:
+                    message = Message(**JSONEncoder().decode(data))
+                    self.logger.warning("Failed to decode message", exc_info=True)
+                    self.nack(message)
+
         except redis.ConnectionError as e:
             raise ConnectionClosed(e) from None
Log from consumer thread
worker.py                  275 CRITICAL Consumer encountered an unexpected error.
Traceback (most recent call last):
  File "/home/thomaz/dev/python/dramatiq/dramatiq/worker.py", line 259, in run
    for message in self.consumer:
  File "/home/thomaz/dev/python/dramatiq/dramatiq/brokers/rabbitmq.py", line 492, in __next__
    message = Message.decode(body)
  File "/home/thomaz/dev/python/dramatiq/dramatiq/message.py", line 96, in decode
    return cls(**global_encoder.decode(data))
  File "/home/thomaz/dev/python/dramatiq/tests/test_rabbitmq.py", line 471, in decode
    raise RuntimeError("xfail")
RuntimeError: xfail
[2021-01-18 13:24:37,328] [Thread-35] [dramatiq.worker.ConsumerThread(default)] [INFO] Restarting consumer in 3.00 seconds.
[2021-01-18 13:24:37,328] [Thread-35] [dramatiq.brokers.rabbitmq._RabbitmqConsumer] [ERROR] Failed to wait for all callbacks to complete.  This can happen when the RabbitMQ server is suddenly restarted.
Traceback (most recent call last):
  File "/home/thomaz/dev/python/dramatiq/dramatiq/brokers/rabbitmq.py", line 507, in close
    self.connection.add_callback_threadsafe(all_callbacks_handled.set)
  File "/home/thomaz.soares/.virtualenvs/dramatiq/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 744, in add_callback_threadsafe
    'BlockingConnection.add_callback_threadsafe() called on '
pika.exceptions.ConnectionWrongStateError: BlockingConnection.add_callback_threadsafe() called on closed or closing connection.
rabbitmq.py                512 ERROR    Failed to wait for all callbacks to complete.  This can happen when the RabbitMQ server is suddenly restarted.
Traceback (most recent call last):
  File "/home/thomaz/dev/python/dramatiq/dramatiq/brokers/rabbitmq.py", line 507, in close
    self.connection.add_callback_threadsafe(all_callbacks_handled.set)
  File "/home/thomaz.soares/.virtualenvs/dramatiq/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 744, in add_callback_threadsafe
    'BlockingConnection.add_callback_threadsafe() called on '
pika.exceptions.ConnectionWrongStateError: BlockingConnection.add_callback_threadsafe() called on closed or closing connection.
[2021-01-18 13:24:40,347] [Thread-35] [dramatiq.worker.ConsumerThread(default)] [CRITICAL] Consumer encountered an unexpected error.
Traceback (most recent call last):
  File "/home/thomaz/dev/python/dramatiq/dramatiq/worker.py", line 259, in run
    for message in self.consumer:
  File "/home/thomaz/dev/python/dramatiq/dramatiq/brokers/rabbitmq.py", line 492, in __next__
    message = Message.decode(body)
  File "/home/thomaz/dev/python/dramatiq/dramatiq/message.py", line 96, in decode
    return cls(**global_encoder.decode(data))
  File "/home/thomaz/dev/python/dramatiq/tests/test_rabbitmq.py", line 471, in decode
    raise RuntimeError("xfail")
RuntimeError: xfail

When a message fails to be decoded the consumer thread can't do
anything to recovery it because the message has not been created yet.

To prevents the worker consumer thread to continuously break, this
change try to handle any error from decode and nack the message.
@Bogdanp Bogdanp merged commit 6b0b373 into Bogdanp:master Jan 20, 2021
@Bogdanp
Copy link
Owner

Bogdanp commented Jan 20, 2021

Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants