New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to handle connection interruption/timeout in PubSub listen() ? #386

Closed
janeczku opened this Issue Oct 5, 2013 · 11 comments

Comments

Projects
None yet
8 participants
@janeczku

janeczku commented Oct 5, 2013

What happens when connection to Redis goes down or timeouts while listening to PubSub messages via a long running "pubsub.listen()" ?
Any tips how i can gracefully handle / restart interrupted connection?
I do appreciate your help!

This is the relevant code:

class Listener(threading.Thread):
    def __init__(self, r, channels):
        threading.Thread.__init__(self)
        self.redis = r
        self.pubsub = self.redis.pubsub()
        try:
            self.pubsub.subscribe(channels)
        except ConnectionError:
            print self, "Redis is down"

    def work(self, item):
        print item['channel'], ":", item['data']

    def run(self):
        for item in self.pubsub.listen():
            if item['data'] == "KILL":
                self.pubsub.unsubscribe()
                print self, "unsubscribed and finished"
                break
            else:
                self.work(item)
@amezhenin

This comment has been minimized.

amezhenin commented Oct 15, 2013

There is option socket_timeout that is supposed to set timouts on socket operation:

redis_conn = redis.StrictRedis(host=HOST, port=PORT, db=DB, socket_timeout=5)

Actually, I didn't find any strict description of this option in documentation, but I should that it didn't work as expected. I'm using long running "pubsub.listen()" on production and it can hang unexpectedly forever.

Currently, I'm working on some kind of "pubsub.listen()"-thread+watchdog-thread implementation of my script. This should give me information about whole script behavior, not only socket behavior.

@andymccurdy andymccurdy added the pubsub label Apr 1, 2014

@andymccurdy

This comment has been minimized.

Owner

andymccurdy commented Apr 1, 2014

Hi. Many apologies on taking so long to cleanup Publish/Subscribe in redis-py. I've recently had some time to make some big improvements and I'd love to get your feedback.

You can review the changes in the pubsub branch here: https://github.com/andymccurdy/redis-py/tree/pubsub

I've written a section in the README docs about PubSub that covers most (hopefully all!) use cases. You can find it here: https://github.com/andymccurdy/redis-py/blob/pubsub/README.rst#publish--subscribe

My current plan is to merge the pubsub branch into master and roll a new release of redis-py by the end of the week. Again, I'd love any feedback.

@endophage

This comment has been minimized.

endophage commented Apr 24, 2014

This won't fix the hang problem. The permanent hang is easy to recreate. Run your redis-server in the foreground of your terminal, put a breakpoint immediately before the while loop in SocketBuffer._read_from_socket(), then run the commands in this gist in your interpreter: https://gist.github.com/endophage/11238000

Once you're listening, kill your redis-server. You'll find the while loop gets stuck in an infinite loop where self._sock.recv(chunksize) immediately returns an empty string when it's called. I would expect a socket.error to be raised but it isn't (points big foam blame finger at core devs). The only way I think this can sensibly be detected is to determine how many times you've looped and received the empty string, set a number of loops to decide you've failed after, then attempt to send the PING command to see if the redis server is still reachable.

This sounds like a pretty awful solution but you should be waiting on self._sock.recv() to return if the redis-server is still there (it will block until data is available), so consistently getting an empty string response is a good sign the connection is no longer valid.

@guanbo

This comment has been minimized.

guanbo commented Apr 24, 2014

@endophage +1 same problem

@endophage

This comment has been minimized.

endophage commented Apr 24, 2014

I have a fix for the hang but it's an ugly ass hack (basically the PING thing I described above, but crow-barred in just to confirm it works). We're experiencing this as a production issue at my work (fortunately in a non-critical but highly prized component), so I'll be actively working on a more elegant fix over the next couple of days.

@andymccurdy

This comment has been minimized.

Owner

andymccurdy commented Apr 29, 2014

Thanks @endophage. I've merged this in.

@sriman

This comment has been minimized.

sriman commented Jan 20, 2016

hi @andymccurdy

trigger = rtunnel.pubsub()
trigger.subscribe('PUSH_NOTIFICATION_GCM_REQUEST')

while True:
        try:
            for request in trigger.listen():
                write_stdout('\n\n Daemon Start: %s - %s UTC'% (request, datetime.utcnow()))

                if request['type'] == 'message':
                    queue = rtunnel.lrange("PUSH_NOTIFICATION_GCM_QUEUE", 0, -1)

                    for message in queue:
                        message = json.loads(message)

                        if self.sendPushNotification(message):
                            rtunnel.lpop("PUSH_NOTIFICATION_GCM_QUEUE")
                            write_stdout('\n\nSuccess: \n  message: %s at %s UTC'% (message, datetime.utcnow()))
                        else:
                            write_stderr('\n\nError: \n  message: %s at %s UTC'% (message, datetime.utcnow()))

                write_stdout('\n\n Daemon End - %s UTC'% datetime.utcnow())

        except(KeyboardInterrupt, SystemExit):
            sys.exit()
        except:
            write_stderr('\n\nException %s occured at %s UTC'% (traceback.format_exc(), datetime.utcnow()))

I am using AWS elasticCache and running this code using supervisor, What my problem is my listener is not listening if the listener is kept quiet for long (say 4 or 5 days). and once we look for sending messages after 4 or days using "publish" the listener is not working (i.e) not executing the code "trigger.listen():" .

will the listener timeouts after certain hours or days ?

But once we rerun the above code the listener is working perfectly(listening the publish message).

Actually we are having 2 nodes in AWS elastic cache, but for these pubsub we are using master node, so no chance of disconnecting.

We are using 2.10.3 version of redis-py.

Any help will be greatly appreciated.

@tr3buchet

This comment has been minimized.

tr3buchet commented Feb 4, 2016

I too am having issues with the subscriber just not listening according to the publisher (result of the publish is a 0 instead of 1). I've taken steps to ensure that the the thread with the subscribed client is running and will restart if it stops, and the thread is running just fine and everything appears to be in order asides from the fact that the publish returns a 0 indicating nothing is listening. Bouncing the service doing the subscribing fixes it, but that is not a viable solution. As @sriman mentioned above it takes a few days to happen and I can't put my finger on it, but it has happened many many times since we started using redis pubsub months ago.

@yuzhushang

This comment has been minimized.

yuzhushang commented Apr 28, 2018

hi @sriman
I have the same problem.Have you solved the problem now?
Can you share your solution?
Any help will be greatly appreciated.

@sriman

This comment has been minimized.

sriman commented Apr 28, 2018

@yuzhushang

This comment has been minimized.

yuzhushang commented Aug 31, 2018

@sriman
thanks, I will try

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment