Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumers do not resume consuming tasks after broker connection error #207

Closed
berkinmalkoc opened this issue Apr 10, 2019 · 7 comments

Comments

Projects
None yet
2 participants
@berkinmalkoc
Copy link

commented Apr 10, 2019

Issues

GitHub issues are for bugs. If you have questions, please ask them on the discussion board.

Checklist

  • Does your title concisely summarize the problem?
  • Did you include a minimal, reproducible example?
  • What OS are you using?
  • What version of Dramatiq are you using?
  • What did you do?
  • What did you expect would happen?
  • What happened?

What OS are you using?

Ubuntu 18.04

What version of Dramatiq are you using?

1.5.0

(daemonized via supervisord)

What did you do?

With several tasks still in the queue, I killed the master Redis process to trigger a failover to the slave one. I observed the following during the failover period of ~5 seconds.

What did you expect would happen?

In our own code, we handle connection errors gracefully as in:

    t1 = time.time()
    while time.time() - t1 < 30:
        try:     
            redis_broker.enqueue(message)
        except:
            print('LOG: Cannot establish connection with the broker.')
        time.sleep(1) 

I expected that the consumer (worker) resume consuming tasks from its relevant queue once the connection problem gets resolved (which eventually happened).

What happened?

I got the following error and the consumers ceased consuming tasks from the queue:

[2019-04-10 18:56:30,580] [PID 2717] [Thread-4] [dramatiq.worker.ConsumerThread(engine-queue.DQ)] [CRITICAL] Consumer encountered a connection error: Error while reading from socket: ('Connection closed by server.',)
[2019-04-10 18:56:30,580] [PID 2717] [Thread-4] [dramatiq.worker.ConsumerThread(engine-queue.DQ)] [INFO] Restarting consumer in 3.00 seconds.
LOG: Cannot establish connection with the broker.
Exception in thread Thread-5:
Traceback (most recent call last):
  File "/home/user/miniconda3/envs/my-env/lib/python3.6/site-packages/redis/connection.py", line 185, in _read_from_socket
    raise socket.error(SERVER_CLOSED_CONNECTION_ERROR)
OSError: Connection closed by server.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/user/miniconda3/envs/my-env/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/home/user/miniconda3/envs/my-env/lib/python3.6/site-packages/dramatiq/worker.py", line 409, in run
    self.process_message(message)
  File "/home/user/miniconda3/envs/my-env/lib/python3.6/site-packages/dramatiq/worker.py", line 451, in process_message
    self.consumers[message.queue_name].post_process_message(message)
  File "/home/user/miniconda3/envs/my-env/lib/python3.6/site-packages/dramatiq/worker.py", line 332, in post_process_message
    self.consumer.ack(message)
  File "/home/user/miniconda3/envs/my-env/lib/python3.6/site-packages/dramatiq/brokers/redis.py", line 287, in ack
    self.broker.do_ack(self.queue_name, message.options["redis_message_id"])
  File "/home/user/miniconda3/envs/my-env/lib/python3.6/site-packages/dramatiq/brokers/redis.py", line 259, in do_dispatch
    return dispatch(args=args, keys=keys)
  File "/home/user/miniconda3/envs/my-env/lib/python3.6/site-packages/redis/client.py", line 3575, in __call__
    return client.evalsha(self.sha, len(keys), *args)
  File "/home/user/miniconda3/envs/my-env/lib/python3.6/site-packages/redis/client.py", line 2761, in evalsha
    return self.execute_command('EVALSHA', sha, numkeys, *keys_and_args)
  File "/home/user/miniconda3/envs/my-env/lib/python3.6/site-packages/redis/client.py", line 775, in execute_command
    return self.parse_response(connection, command_name, **options)
  File "/home/user/miniconda3/envs/my-env/lib/python3.6/site-packages/redis/client.py", line 789, in parse_response
    response = connection.read_response()
  File "/home/user/miniconda3/envs/my-env/lib/python3.6/site-packages/redis/connection.py", line 637, in read_response
    response = self._parser.read_response()
  File "/home/user/miniconda3/envs/my-env/lib/python3.6/site-packages/redis/connection.py", line 290, in read_response
    response = self._buffer.readline()
  File "/home/user/miniconda3/envs/my-env/lib/python3.6/site-packages/redis/connection.py", line 224, in readline
    self._read_from_socket()
  File "/home/user/miniconda3/envs/my-env/lib/python3.6/site-packages/redis/connection.py", line 199, in _read_from_socket
    (e.args,))
redis.exceptions.ConnectionError: Error while reading from socket: ('Connection closed by server.',)
[2019-04-10 18:56:32,092] [PID 2717] [Thread-3] [dramatiq.worker.ConsumerThread(engine-queue)] [CRITICAL] Consumer encountered a connection error: Error while reading from socket: ('Connection closed by server.',)
[2019-04-10 18:56:32,092] [PID 2717] [Thread-3] [dramatiq.worker.ConsumerThread(engine-queue)] [INFO] Restarting consumer in 3.00 seconds.
[2019-04-10 18:56:33,584] [PID 2717] [Thread-4] [dramatiq.worker.ConsumerThread(engine-queue.DQ)] [CRITICAL] Consumer encountered a connection error: Error while reading from socket: ('Connection closed by server.',)

In other cases where no unhandled exception arises (due to low level Dramatiq code not being run during the connection error), the workers immediately resume consuming tasks once the connection gets successfully established.

Additional info: Here is the output of redis-cli monitor | grep engine-queue:

1554916464.892525 [0 lua] "lpop" "dramatiq:engine-queue.DQ"
1554916465.508942 [0 127.0.0.1:46574] "EVALSHA" "605aad1746512fb86e8ee6fa6129dccdf32eed59" "1" "dramatiq" "fetch" "1554916465508" "engine-queue.DQ" "65499115-8d76-4aa1-a4a1-587295ab4694" "60000" "604800000" "0" "1000"
1554916465.509014 [0 lua] "lpop" "dramatiq:engine-queue.DQ"
1554916466.367562 [0 127.0.0.1:46574] "EVALSHA" "605aad1746512fb86e8ee6fa6129dccdf32eed59" "1" "dramatiq" "fetch" "1554916466367" "engine-queue.DQ" "65499115-8d76-4aa1-a4a1-587295ab4694" "60000" "604800000" "0" "1000"
1554916466.367638 [0 lua] "lpop" "dramatiq:engine-queue.DQ"
1554916467.148150 [0 127.0.0.1:46574] "EVALSHA" "605aad1746512fb86e8ee6fa6129dccdf32eed59" "1" "dramatiq" "fetch" "1554916467147" "engine-queue.DQ" "65499115-8d76-4aa1-a4a1-587295ab4694" "60000" "604800000" "0" "1000"
1554916467.148233 [0 lua] "lpop" "dramatiq:engine-queue.DQ"
1554916467.949773 [0 127.0.0.1:46574] "EVALSHA" "605aad1746512fb86e8ee6fa6129dccdf32eed59" "1" "dramatiq" "fetch" "1554916467949" "engine-queue.DQ" "65499115-8d76-4aa1-a4a1-587295ab4694" "60000" "604800000" "0" "1000"
1554916467.949852 [0 lua] "lpop" "dramatiq:engine-queue.DQ"
1554916468.725346 [0 127.0.0.1:46574] "EVALSHA" "605aad1746512fb86e8ee6fa6129dccdf32eed59" "1" "dramatiq" "fetch" "1554916468724" "engine-queue.DQ" "65499115-8d76-4aa1-a4a1-587295ab4694" "60000" "604800000" "0" "1000"

So, it seems that the workers stop checking the main queue itself for tasks.

@Bogdanp

This comment has been minimized.

Copy link
Owner

commented Apr 11, 2019

I need to see what happened after then final two log lines. Consumers restart after a timeout whenever one of these failures occur. Did that not happen for you?

@berkinmalkoc

This comment has been minimized.

Copy link
Author

commented Apr 11, 2019

I need to see what happened after then final two log lines. Consumers restart after a timeout whenever one of these failures occur. Did that not happen for you?

Here is the rest of the logs (these lines came after I restarted dramatiq via supervisord):

[2019-04-10 19:10:29,057] [PID 2713] [MainThread] [dramatiq.MainProcess] [INFO] Sending signal 15 to worker processes...
[2019-04-10 19:10:29,058] [PID 2717] [MainThread] [dramatiq.WorkerProcess(0)] [INFO] Stopping worker process...
[2019-04-10 19:10:29,468] [PID 2717] [MainThread] [dramatiq.worker.Worker] [INFO] Shutting down...
[2019-04-10 19:10:30,349] [PID 2717] [MainThread] [dramatiq.worker.Worker] [INFO] Worker has been shut down.
[2019-04-10 19:10:30,964] [PID 7381] [MainThread] [dramatiq.MainProcess] [INFO] Dramatiq '1.5.0' is booting up.
[2019-04-10 19:10:31,006] [PID 7385] [MainThread] [dramatiq.WorkerProcess(0)] [INFO] Worker process is ready for action.

This is what happens: Dramatiq does indeed restart the workers, but they do not seem to start consuming tasks from the main queue, although I can see their heartbeats in Redis (but they do resume their normal functionaning after a manual restart).

@Bogdanp

This comment has been minimized.

Copy link
Owner

commented Apr 11, 2019

So between the logs in the last comment and these logs it looks like you waited for about 15 minutes before restarting. Is that correct?

If that's the case I'll try to reproduce and fix this sometime this weekend. We do have unit tests that check what happens when RMQ suddenly goes away, but I don't remember if I we have anything like that for Redis.

@Bogdanp Bogdanp added the bug label Apr 11, 2019

@Bogdanp Bogdanp added this to the v1.6.0 milestone Apr 11, 2019

@berkinmalkoc

This comment has been minimized.

Copy link
Author

commented Apr 11, 2019

So between the logs in the last comment and these logs it looks like you waited for about 15 minutes before restarting. Is that correct?

Exactly.

If that's the case I'll try to reproduce and fix this sometime this weekend. We do have unit tests that check what happens when RMQ suddenly goes away, but I don't remember if I we have anything like that for Redis.

Thank you.

Let me remind that, during those 15 minutes of waiting, engine-queue.DQ kept being LPOPed whereas the main queue itself (engine-queue) did not get any interaction at all, as can be seen in the Redis monitor output I previously shared.

@Bogdanp

This comment has been minimized.

Copy link
Owner

commented Apr 11, 2019

@berkinmalkoc would you mind giving #209 a try?

@Bogdanp Bogdanp closed this in #209 Apr 15, 2019

@berkinmalkoc

This comment has been minimized.

Copy link
Author

commented Apr 15, 2019

@berkinmalkoc would you mind giving #209 a try?

@Bogdanp, thank you, the problem seems to have been resolved according to my preliminary tests.

I will update if any issues arise in further tests.

@Bogdanp

This comment has been minimized.

Copy link
Owner

commented Apr 15, 2019

Glad to hear it! I'll release 1.6.0 containing the fix later this week.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.