Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Server response periodically blocks? #136

Closed
lordnynex opened this issue Feb 6, 2016 · 11 comments
Closed

Server response periodically blocks? #136

lordnynex opened this issue Feb 6, 2016 · 11 comments

Comments

@lordnynex
Copy link

Hello,

I will provide a reproduction example, but first, I am aware that what I'm doing isn't normal. I believe technically, this should be able to work.

It seems that sending return values is periodically blocking indefinitely. I'm not sure why this is occuring.

If you run the below example and run

for i in {1..100} ; do zerorpc --timeout 1 tcp://localhost:5559 lolita ; done

You'll notice that ever 5th or so request results in a timeout. But from the debug output in the server, you can see that the rpc method is executed and only appears to block while sending the response to the broker.

Is there a better way to do this?

# main.py 
# A lot of debugging junk in here
import gevent
from gevent import monkey

monkey.patch_all()

import gipc

from zmq import green as zmq

import zerorpc
import logging
import multiprocessing as mp

from zerorpc.socket import SocketBase
from zerorpc.core import ServerBase
from .util import zhelpers as zhelpers

class SrvBase(ServerBase, SocketBase):
    def __init__(self, methods=None, name=None, context=None, pool_size=1000, heartbeat=5):
        SocketBase.__init__(self, zmq.XREP, context)
        if methods is None:
            methods = self

        name = name or ServerBase._extract_name(methods)
        methods = ServerBase._filter_methods(SrvBase, self, methods)
        ServerBase.__init__(self, self._events, methods, name, context, pool_size, heartbeat)

        # print "DEBUG TEST"
        # print self.__dict__

    def close(self):
        ServerBase.close(self)
        SocketBase.close(self)

class RPCWorker(SrvBase):
    def lolita(self):
        return 42

    def add(self, a, b):
        return a + b

def initializer(url="tcp://127.0.0.1:5560"):
    def do_work():
        class MySrv(SrvBase):
            def lolita(self):
                print "ME"
                return 42

            def add(self, a, b):
                return a + b

        srv = MySrv()
        print "BREAK"
        srv.connect(url)
        print "BREAK1"
        gevent.spawn(srv.run)
        print "BREAK2"
        while True:
            gevent.sleep(0.50)

    monkey.patch_all()

    THREADPOOL = gevent.get_hub().threadpool
    threads = []
    for i in range(5):
        t = THREADPOOL.spawn(do_work)
        threads.append( t )

    print threads

    while True:
        # print "TICK"
        gevent.sleep(0.50)

class DataPool:
    def __init__(self):
        self.ctx = zmq.Context.instance()
        try:
            # Socket facing clients
            self.frontend = self.ctx.socket(zmq.XREP)
            self.frontend.bind("tcp://127.0.0.1:5559")

            # Socket facing services
            self.backend = self.ctx.socket(zmq.XREQ)
            self.backend.bind("tcp://127.0.0.1:5560")

            # Socket poller
            self.poller = zmq.Poller()
            self.poller.register(self.frontend, zmq.POLLIN)
            self.poller.register(self.backend, zmq.POLLIN)

            # Start pool
            # self.pool = mp.Pool(processes=4, initializer=initializer)
            procs = []
            for i in xrange(10):
                p = gipc.start_process(initializer)
                procs.append(p)
            # p = mp.Process(target=initializer)
            # p.start()
        except Exception, e:
            raise

    def run(self, *args, **kwargs):
        gevent.spawn(self.run_loop)

    def run_loop(self):
        try:
            while True:
                # print "TICK"
                socks = dict(self.poller.poll(100))

                gevent.sleep(0)
                if socks.get(self.frontend) == zmq.POLLIN:
                    message = self.frontend.recv(zmq.NOBLOCK)
                    more = self.frontend.getsockopt(zmq.RCVMORE)
                    if more:
                        print "Rtr<<",
                        gevent.spawn(zhelpers.dump_part, message)
                        self.backend.send(message, zmq.SNDMORE)
                        print "SENDMORE"
                        gevent.sleep(0)
                    else:
                        print "Rtr<.",
                        gevent.spawn(zhelpers.dump_part, message)
                        self.backend.send(message, zmq.NOBLOCK)

                gevent.sleep(0)
                if socks.get(self.backend) == zmq.POLLIN:
                    message = self.backend.recv(zmq.NOBLOCK)
                    more = self.backend.getsockopt(zmq.RCVMORE)
                    if more:
                        print "Rtr>>",
                        gevent.spawn(zhelpers.dump_part, message)
                        self.frontend.send(message, zmq.SNDMORE)
                        print "RECEIVEMORE"
                        gevent.sleep(0)
                    else:
                        print "Rtr>.",
                        gevent.spawn(zhelpers.dump_part, message)
                        self.frontend.send(message, zmq.NOBLOCK)
                gevent.sleep(0)
        except Exception, e:
            print e
            print "bringing down zmq device"
        finally:
            pass
            self.frontend.close()
            self.backend.close()
            self.ctx.term()
# util.zhelpers
from random import randint

import zmq
import msgpack


# Receives all message parts from socket, prints neatly
def dump(zsocket):
    print "----------------------------------------"
    for part in zsocket.recv_multipart():
        dump_part(part)

def dump_part(part):
    # print part
    try:
        # dec_part = format_part( part )
        # dec_part = msgpack.unpackb(part)
        unpacker = msgpack.Unpacker(encoding='utf-8')
        unpacker.feed(part)
        dec_part = unpacker.unpack()
    except Exception, e:
        print "Can't unpack %s" % e
        dec_part = format_part( part )
    finally:
        print "[%03d]" % len(part), dec_part
    # print "[%03d]" % len(part), format_part( part )

def format_part(part):
    if all(31 < ord(c) < 128 for c in part):
        return "'" + part + "'"
    else:
        return "0x" + "".join("%x" % ord(c) for c in part)

# Set simple random printable identity on socket
def set_id(zsocket):
    identity = "%04x-%04x" % (randint(0, 0x10000), randint(0, 0x10000))
    zsocket.setsockopt(zmq.IDENTITY, identity)
@faith0811
Copy link
Contributor

What's your pyzmq version? Try to upgrading pyzmq to 15.2.0 or downgrading to 13.1.0.

@lordnynex
Copy link
Author

@faith0811

Hello,

This is using 15.2.0. If I downgrade to 13.1.0, I can not perform an equivalent test because it does not appear to have the XREP socket type. Any other ideas?

@lordnynex
Copy link
Author

@faith0811

I reworked this a bit and downgraded to 13.1.0, and you are correct, this works as expected now. This should be considered a duplicate of #123

@bombela
Copy link
Member

bombela commented Feb 8, 2016

Seems to be another case of monkey patching gevent everywhere messing up with pyzmq then. I have no easy solution in mind.

@bombela
Copy link
Member

bombela commented Feb 8, 2016

But why do you need to monkey patch in the first place? You are basically begging for trouble. Monkey patching is really crazy and changing the behavior of some important functions. Either you write your code fully gevent compliant or not. Of course, zerorpc-python requires gevent, and there is no version of zerorpc working without gevent. Could a version independent of gevent fixes your problems?

@bombela
Copy link
Member

bombela commented Feb 8, 2016

Thanks to @faith0811 see zeromq/pyzmq#766.

In short: pyzmq has a gc thread to handle the lifetime of zerocopy messages. This thread uses a zmq context. When you gevent monkey patch the threading API, the gc thread is not using the green wrapper of the zmq context (zmq.green.Context). The gc thead hangs.

The latest version of pyzmq does detect if the threading API was monkey patched, and uses zmq.green.Context in this case. This likely means you have to monkey patch the threading API early on, before pyzmq starts the gc thread.

@bombela bombela closed this as completed Feb 8, 2016
@lordnynex
Copy link
Author

@bombela I think (at least in my case) it is not enough for zerorpc.Context to use zmq.green. I experimented with this a bit over the weekend, and in my case it did not change the problem at all.

The issue @faith0811 points out (thank you!) works reliably. This explains the lockup I was seeing when using threads.

Unfortunately, I think it is necessary to monkey patch my framework early on because it is simply providing multi processing wrappers to an existing single threaded code base. A lot of libs will be included indirectly.

@bombela
Copy link
Member

bombela commented Feb 8, 2016

It has nothing to do with zerorpc here, zerorpc is based on gevent and thus already works with it. You would encounter the same freeze without zerorpc. It happenned when you combine gevent monkey patching of the threading API + using zerocopy messages with pyzmq.

@bombela
Copy link
Member

bombela commented Feb 8, 2016

And yes @faith0811 provided the links to the real problem and solution, I was merely reporting it in this thread for closing it :)

@lordnynex
Copy link
Author

You are correct that it is not a zerorpc issue, however, zerorpc could proactively fix this in zmq.Context subclass by providing an initializer. Sorry I should have explained better, I haven't been sleeping much.

@bombela
Copy link
Member

bombela commented Feb 8, 2016

Ok, I am not sure to follow 100%, do you mean providing something like zerorpc.green.Context similarly to zmq.green.Context? Or you mean providing a way to initialize a zerorpc.Context (

class Context(zmq.Context):
) from any zmq.Context instance?

I could subclass zmq.green.Context maybe? But I am still failing to understand what it would fix.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants