Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

python grpc server with multiprocessing fails #16001

Closed
p1c2u opened this issue Jul 12, 2018 · 27 comments
Closed

python grpc server with multiprocessing fails #16001

p1c2u opened this issue Jul 12, 2018 · 27 comments

Comments

@p1c2u
Copy link

p1c2u commented Jul 12, 2018

What version of gRPC and what language are you using?

grpc 1.13.0 with python 3.6.5

What operating system (Linux, Windows, …) and version?

CentOS 7 (Linux 3.10.0-862.3.2.el7.x86_64)

What runtime / compiler are you using (e.g. python version or version of gcc)

CPython 3.6.5

What did you do?

server = grpc.server(futures.ProcessPoolExecutor(max_workers=4))
service_pb2_grpc.add_FindPortServicer_to_server(FindPort(), server)
server.add_insecure_port('[::]:' + port)
server.start()

What did you expect to see?

Based on doc https://github.com/grpc/grpc/blob/master/doc/fork_support.md
gRPC server should run on 4 processes

What did you see instead?

Server crash

8156 tcp_server_posix.cc:210]    Failed accept4: Invalid argument
Traceback (most recent call last):
  File "/usr/lib64/python3.6/multiprocessing/queues.py", line 234, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/usr/lib64/python3.6/multiprocessing/reduction.py", line 52, in dumps
    cls(buf, protocol).dump(obj)
  File "stringsource", line 2, in grpc._cython.cygrpc.RequestCallEvent.__reduce_cython__
TypeError: no default __reduce__ due to non-trivial __cinit__

Make sure you include information that can help us debug (full error message, exception listing, stack trace, logs).

Anything else we should know about your project / environment?

@mehrdada
Copy link
Member

@ericgribkoff is this a supported use-case?

@ericgribkoff
Copy link
Contributor

No, this is not supported. futures.ProcessPoolExecutor forks in response to submitted tasks, meaning after the server starts (gRPC Python sends tasks to the executor to handle incoming RPCs). This is not compatible with gRPC's fork support on the server-side. You will need to pre-fork, meaning fork your subprocesses and then have each one (separately) start their own gRPC server instance.

What did you expect to see?

Based on doc https://github.com/grpc/grpc/blob/master/doc/fork_support.md
gRPC server should run on 4 processes

@p1c2u Thanks for pointing this out. I see that our documentation could be a bit confusing if you're looking at using fork with gRPC servers: we should update that document to make it more clear that the fork-support there is for client-side usage. (cc @kpayson64)

@mohit-chawla
Copy link

mohit-chawla commented Sep 25, 2018

[PYTHON] @ericgribkoff , I have ran into a use case where i am using a gRPC for a server side CPU bound task (and need to use python multi processing as there isn't an option in pure Python). Can you please suggest a solution here as i see no support for server side for fork ?

I am facing grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with (StatusCode.INTERNAL, Received RST_STREAM with error code 2)>

@evanj How did you deal with #15334 (comment) ?

@kriticism
Copy link

@ericgribkoff , @evanj : I'm facing the same issue as @mohit-chawla

@yogin16
Copy link

yogin16 commented Oct 26, 2018

+1

@ericgribkoff
Copy link
Contributor

The following two approaches should help with combining gRPC Python servers and the fork() syscall (e.g., using Python's multiprocessing library). The main constraint that must be satisified is that your process must only invoke the fork() syscall before you create your gRPC server(s).

Option 1:

If your environment supports the SO_REUSEPORT socket option, you can run multiple copies of your gRPC server in individual processes started via multiprocessing.Process, and all of the servers can listen on the same port. SO_REUSEPORT is set by default if you build gRPC from source on Linux, but it is not available in the manylinux spec used by the binaries we distribute on pip, so depending on your environment you may have to resort to option 2.

This type of pre-fork + SO_REUSEPORT would look something like the following:

def startGrpcServer():
    server = grpc.server()
    server.add_insecure_port('[::]:50051')
    server.start()

for i in range(5):
  p = multiprocessing.Process(target=startGrpcServer)
  p.start()

Option 2:

Run a single gRPC Python server, but offload all CPU-intensive work to a multiprocessing.Pool started before you create the gRPC server.

This would look like the following:

pool = multiprocessing.Pool(processes=4)

# The implementation of your RPC method. This will run in the main process, 
# but the work will be done asynchronously by the pool of pre-forked workers.
def rpcMethod(self, request, context):
  result = pool.apply_async(someExpensiveFunction(request)
  return result.get(timeout=1)

server = grpc.server()
server.add_insecure_port('[::]:50051')
server.start()

@ferrouswheel
Copy link

ferrouswheel commented Nov 11, 2018

Did this behaviour change recently?

I have a grpc server invoking tensorflow, but tensorflow doesn't have anyway to explicitly let go of GPU memory.

Thus my solution, when receiving a grpc request, was to spawn a new process to do the work, then let it terminate after the work was done.

I was pretty sure I had this working with grpc earlier in 2018, but now I get Failed accept4: Invalid argument... 🤔

(I can work around it in an awkward way, but I'd like to clarify if I'm going mad)

@ericgribkoff
Copy link
Contributor

@ferrouswheel gRPC servers using fork() (without following this with an immediate exec()) to process incoming requests has never been supported. Depending on a variety of factors (including the version of gRPC, what polling engine was in use) it might have worked sometimes, but probably not often enough to appear functional through any non-trivial amount of usage.

However, if you were spawning a new process via fork()+exec(), that should have worked - and should continue to work.

@ferrouswheel
Copy link

@ericgribkoff Thank you for the clarification. I must have just got lucky with a prior combination of factors.

@yifeikong
Copy link

@ericgribkoff Do you mean that if we build a executor that preforks before grpc server starts, then it would work?

@ericgribkoff
Copy link
Contributor

@yifeikong Not exactly: you will need to prefork before any calls into gRPC, including instantiation of the server.

@jshlbrd
Copy link

jshlbrd commented Feb 28, 2019

@ericgribkoff any chance you can flesh out a more concrete example for multiprocessing.Pool? I'm not seeing how one could offload incoming calls to the pool (i.e. in your example, how would rpcMethod be referenced by the gRPC server?)

@lidizheng
Copy link
Contributor

@jshlbrd I think Eric's option 1 answers your question better. You start gRPC server in the process worker function, and the incoming traffic will be automatically distributed due to
SO_REUSEPORT.

@jshlbrd
Copy link

jshlbrd commented Feb 28, 2019

@lidizheng -- thanks, yeah that's what I thought, and I was able to get something like that working for my application. However, that spins up a gRPC server in each child process, I was curious if it's possible to run one gRPC server from the main process and distribute the calls to a group of child processes. My main use for this is to support use of signals -- you can't use signals from within threads, but you can from within processes.

@lidizheng
Copy link
Contributor

@jshlbrd Unfortunately, the fork type you are talking about is currently not supported for gRPC Python server. The fork support currently only available for gRPC Python client, and it is actually harder than people expected. We are using C extension and perform a lot of IO without GIL with multiple threads with different responsibilities, and it is challenging to prevent CPython from dead locking itself.

@gnossen
Copy link
Contributor

gnossen commented Mar 1, 2019

It's worth noting that our manylinux wheels do not support SO_REUSEPORT, so for the moment, if you want to take advantage of this feature on Linux, you'll need to compile from source. You can do this by installing with pip install gprcio --no-binary grpcio.

@jshlbrd
Copy link

jshlbrd commented Mar 6, 2019

I'd like to share this as a proposed workaround for this problem and seek feedback on this technique. Here's a proof-of-concept for how I've got this working (warning, some pseudo-code is present here):

from concurrent import futures
from concurrent.futures import TimeoutError

import grpc
import pebble

def make_widget(i):
    return i * 2

class WidgetServicer(widget_pb2_grpc.WidgetServicer):
    def __init__(self, pool):
        self.pool = pool

    def WidgetRequest(self, request, context):
        response = widget_pb2.Response()
        future = self.pool.schedule(make_widget, args=[request.Value], timeout=context.time_remaining())

        try:
            response.Widget = future.result()
        except TimeoutError:
            context.abort(grpc.StatusCode.DEADLINE_EXCEEDED, 'Timeout')  # this handles process timeout
        except pebble.common.ProcessExpired:
            context.abort(grpc.StatusCode.RESOURCE_EXHAUSTED, 'Abnormal termination')  # this handles process crashes        

        return response

def main():
    pool = pebble.ProcessPool(max_workers=processes, max_tasks=tasks)
    executor = futures.ThreadPoolExecutor(max_workers=processes)
    server = grpc.server(executor)
    servicer = WidgetServicer(pool)
    <start your server and block forever>

Here's how I think this is working: when you set both the ProcessPool and ThreadPoolExecutor to the same count, it locks the gRPC servicer into only feeding requests to the number child processes that are available. If you were to increase the ThreadPoolExecutor count, then requests would queue in the ProcessPool creating a backlog of work that may never be delivered upon; keeping this count the same, the gRPC servicer will maintain control over the life of the request and never queue them in the ProcessPool (if the count of requests exhausts the number of available processes, then eventually the client would get a timeout).

There's also some error checking for timeouts and process crashes in the ProcessPool.

@gnossen
Copy link
Contributor

gnossen commented Mar 6, 2019

@jshlbrd That's exactly right. To be precise, your maximum concurrency is min(thread_count, process_count). If the bottleneck is your thread count, then requests will queue here. If the bottleneck is your process count, then the queueing will happen at your call to self.pool.schedule.

Generally, you'll want the process count to match your processor count and you'll want your thread count to be equal to or higher than that (but not too much higher or you'll get thrashing from context switching). This will buy you the maximum possible concurrency.

By default, Python will initialize a ProcessPoolExecutor with max_workers equal to your processor count and it will initialize a ThreadPoolExecutor with a max_workers value five times that..

It's worth noting that if the work done in your handler releases the GIL, (i.e. by using a C extension like numpy), you probably don't need to spawn subprocesses at all.

@jshlbrd
Copy link

jshlbrd commented Mar 6, 2019

Woohoo, thanks for the feedback!

You're correct with regard to possibly not needing subprocesses. In my case, the tasks being performed by the gRPC server are CPU-intensive and can range in time from sub-second to several minutes -- that's why I need to use subprocesses that can be timed out.

@gnossen
Copy link
Contributor

gnossen commented Mar 13, 2019

Hopefully, this example will serve as a good reference for anyone who runs across this in the future.

@jshlbrd
Copy link

jshlbrd commented Mar 13, 2019

That’s a good example of utilizing multiple cores when your server isn’t running CPU-intensive tasks, but I think it still may not address the need to natively support process pools and thread pools.

For example, when using that example as a template for CPU-intensive tasks, you could reduce the number of threads inside each gRPC servicer to one, but because each sub process is running a different server, they aren’t working from a shared RPC queue and (IIRC) reuseport appears to randomly select one of the connected servers — depending on your application, this can lead to uneven balancing across cores.

Probably goes without saying that no one should think of reuseport as anything more than a simple load balancer, but the value of a single RPC queue is lost in that example.

Edit: Reading into reuseport more, it uses IP/port hashing to distribute packets to the servers. I’m curious if that means that any of these gRPC servers behind a load balancer (like Envoy) would lead to uneven distribution. I haven’t tested this, but it’s leading me to believe that a more stable solution may be to use a local load balancer and spin up multiple gRPC servers on different ports?

@jshlbrd
Copy link

jshlbrd commented Mar 13, 2019

Based on more testing, I'm now convinced that pre-forking via subprocess pools isn't a viable method for handling this (too many failure scenarios to deal with). Here's a summary of techniques that can work with their pitfalls:

Pools (including multiprocessing.Pool, billiard.Pool):
Setup pool before gRPC server is started (pre-fork)
Must ensure that no subprocesses crash or stop, otherwise pool becomes broken
Must ensure that no subprocesses are restarted, otherwise main thread seg faults

Processes w/ SO_REUSEPORT:
Setup processes, each process runs its own gRPC server bound to the same port w/ SO_REUSEPORT (pre-fork)
SO_REUSEPORT uses IP/port hashing for load balancing and will lead to uneven task distribution if a single client sends many requests
SO_REUSEPORT may lead to uneven task distribution when used with a frontend proxy/load balancer (e.g. Envoy) (untested)
RPC queue not shared across processes

Processes w/o SO_REUSEPORT:
Setup processes, each process runs its own gRPC server bound to a unique port (pre-fork)
Requires a local load balancer to distribute tasks
RPC queue not shared across processes

@jshlbrd
Copy link

jshlbrd commented Mar 21, 2019

@gnossen any feedback on using the file system to communicate tasks between gRPC threads and subprocesses? I don't know the gRPC internals well enough to know if post-forking subprocesses that do not directly interact with the gRPC service would cause problems in this scenario. Here's some pseudo-code ...

TMP_REQ = '/path/to/your/tmp/requests/'
TMP_RESP = '/path/to/your/tmp/responses/'

class WidgetServicer(widget_pb2_grpc.WidgetServicer):
    def __init__(self, queue):
        self.queue = queue

    def StreamWidget(self, request_iterator, context):
        resp = widget_pb2.Response()
        uid = uuid.uuid4().hex
        tmp_req = os.path.join(TMP_REQ, uid)
        tmp_resp = os.path.join(TMP_RESP, uid)

        with open(tmp_req, 'wb') as f:
            for request in request_iterator:
                f.write(request.widget)

        self.queue.put(
             {'tmp_req': tmp_req,
             'tmp_resp': tmp_resp,
             'timeout': context.time_remaining()},
        )

        while context.is_active():
            if os.path.isfile(tmp_resp):
                with open(tmp_resp) as f:
                    resp.widget = f.read()
                os.remove(tmp_resp)
                return resp
            time.sleep(0.1)


run = 1


def main():
    def handler(sig, frame):
        global run
        run = 0

    signal.signal(signal.SIGTERM, handler)
    signal.signal(signal.SIGINT, handler)

    q = multiprocessing.Queue()
    workers = []
    for _ in range(4):
        p = Worker(q)  # multiprocessing.process or some other kind of subproc
        p.start()
        workers.append(p)


    executor = futures.ThreadPoolExecutor(max_workers=8)
    server = grpc.server(executor,
                         maximum_concurrent_rpcs=100)
    widget_pb2_grpc.add_WidgetServicer_to_server(WidgetServicer(q), server)
    server.add_insecure_port('127.0.0.1:8443')
    server.start()

    while run:
        for p in list(workers):
            if not p.is_alive():
                p.join()
                workers.remove(p)
                p = Worker(q)  # multiprocessing.process or some other kind of subproc
                p.start()
                workers.append(p)
        time.sleep(5)

    stop = server.stop(10)
    stop.wait()
    for p in list(workers):
        p.shutdown()
        p.join()

I have code that follows this pattern that appears to work, but I'd like to get some confirmation that this design doesn't cause issues with the gRPC internals.

@lidizheng
Copy link
Contributor

@jshlbrd Communicating through files will result in race condition unless you are using RWLock or similar synchronization mechanism. In another word, you may read incomplete data.

Also, if you want to start new Worker in the second half, using subprocess might be a better choice.

PS. Couple days ago, @gnossen confirmed that SO_REUSEPORT is not evenly distributing the workload across processes. Thank you for pointing that out.

@jshlbrd
Copy link

jshlbrd commented Mar 22, 2019 via email

@jshlbrd
Copy link

jshlbrd commented Mar 22, 2019

Apologies for spamming the issue, but here's a more complete example of what I described in the previous message. All of these Redis commands are atomic, which enables synchronization. Things become more complicated (but not impossible) when dealing with streaming RPCs.

class Worker(multiprocessing.process):  # or use some other kind of subproc
    def __init__(self):
        super().__init__()
        # used as the task queue
        self.r0 = redis.StrictRedis(host='localhost', port=6379, db=0)
        # used to temporarily store requests
        self.r1 = redis.StrictRedis(host='localhost', port=6379, db=1)
        # used to temporarily store results
        self.r2 = redis.StrictRedis(host='localhost', port=6379, db=2)

    def run(self):
        while 1:
            task = self.r0.blpop('queue', timeout=1)  # wait forever w/ timeout=0
            if task:
                loaded_task = json.loads(task[1])
                try:
                    with interruptingcow.timeout(loaded_task['timeout'], RuntimeError):  # interruptingcow is a convenient package for timing out operations, but you could substitute signal too
                        request = self.r1.get(loaded_task['uid'])
                        result = process(request)  # do something with the request
                        self.r2.setex(task['uid'], 30, result)
                except RuntimeError:
                    print('RPC timed out')
    

class WidgetServicer(widget_pb2_grpc.WidgetServicer):
    def __init__(self):
        # used as a task queue
        self.r0 = redis.StrictRedis(host='localhost', port=6379, db=0)
        # used to temporarily store requests
        self.r1 = redis.StrictRedis(host='localhost', port=6379, db=1)
        # used to temporarily store results
        self.r2 = redis.StrictRedis(host='localhost', port=6379, db=2)

        
    def UnaryWidget(self, request, context):
        resp = widget_pb2.Response()
        uid = uuid.uuid4().hex
        task = {
            'uid': uid,
            '...': {},  # add any additional data provided by request to the task
        }

        self.r1.setex(uid, 30, request.widget)
        task['timeout'] = context.time_remaining()
        self.r0.rpush('queue', json.dumps(task))

        while context.is_active():
            result = self.r2.get(uid)
            if result is not None:
                resp.widget = result
                break
            time.sleep(0.1)

        return resp


run = 1


def main():
    def handler(sig, frame):
        global run
        run = 0

    signal.signal(signal.SIGTERM, handler)
    signal.signal(signal.SIGINT, handler)

    workers = []
    for _ in range(4):
        p = Worker()  # multiprocessing.process or some other kind of subproc
        p.start()
        workers.append(p)

    executor = futures.ThreadPoolExecutor(max_workers=4)
    server = grpc.server(executor,
                         maximum_concurrent_rpcs=100)
    widget_pb2_grpc.add_WidgetServicer_to_server(WidgetServicer(), server)
    server.add_insecure_port('127.0.0.1:8443')  # please don't do this in production
    server.start()

    while run:
        for p in list(workers):
            if not p.is_alive():
                p.join()
                workers.remove(p)
                p = Worker()  # multiprocessing.process or some other kind of subproc
                p.start()
                workers.append(p)
        time.sleep(5)

    stop = server.stop(10)
    stop.wait()
    for p in list(workers):
        p.shutdown()
        p.join()

There's some flexibility here with which Redis commands are used (e.g. I am explicitly not deleting keys from the Redis databases and instead letting the expiration time clean everything up) and a simple service would likely only need to utilize two Redis databases. I think this might be the only solution proposed so far that lets you safely integrate rotating subprocesses with a gRPC servicer that uses a single bound network address?

cc @gnossen @lidizheng

@stale
Copy link

stale bot commented Sep 18, 2019

This issue/PR has been automatically marked as stale because it has not had any update (including commits, comments, labels, milestones, etc) for 180 days. It will be closed automatically if no further update occurs in 1 day. Thank you for your contributions!

@stale stale bot closed this as completed Sep 19, 2019
@lock lock bot locked as resolved and limited conversation to collaborators Dec 18, 2019
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests