Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[QUESTION] interplay with multiprocessing #548

Closed
ananis25 opened this issue Jan 10, 2020 · 15 comments
Closed

[QUESTION] interplay with multiprocessing #548

ananis25 opened this issue Jan 10, 2020 · 15 comments

Comments

@ananis25
Copy link

Describe the bug

I'm trying to run parallel tasks with a timeout per task (using multiprocessing) inside an API method. On trying to terminate the child processes post the time limit, the server process shuts down and disconnects.

To Reproduce

  1. Create a file: repro.py
import os
import time
import uvicorn
from concurrent.futures import ProcessPoolExecutor


def simple_routine(sleep_for):
    print(f"PID {os.getpid()} has sleep time: {sleep_for}")
    time.sleep(sleep_for)
    return "done"


def test_endpoint():
    print(f"main process: {os.getpid()}")

    START_TIME = time.time()
    with ProcessPoolExecutor(max_workers=2) as pool:
        futures = [
            pool.submit(simple_routine, 1), 
            pool.submit(simple_routine, 10), 
        ]

        results = []
        for fut in futures:
            try:
                results.append(fut.result(timeout=2))
            except:
                results.append("not done")

       # terminate the processes which are still running
        for pid, proc in pool._processes.items():
            print("terminating pid ", pid)
            proc.terminate()
    
    print("exiting at: ", int(time.time() - START_TIME))
    return True


async def app(scope, receive, send):
    await send({
        'type': 'http.response.start',
        'status': 200,
        'headers': [
            [b'content-type', b'text/plain'],
        ]
    })
    
    test_endpoint()
    
    await send({
        'type': 'http.response.body',
        'body': b'Hello, world!',
    })


if __name__=="__main__":
    uvicorn.run(app, host="0.0.0.0", port=5000)
  1. Run it as python repro.py.
  2. Open another python interpreter and make this web request.
import requests
for _ in range(20):
    print(requests.get("http://localhost:5000/").text)
  1. The server process shuts down after the first request.

Expected behavior

We start 2 processes one of which exceeds the time limit after which we try terminate it. The server shouldn't shut down and continue serving requests. Interestingly, the server doesn't actually exit until the long running process is complete.

INFO:     Started server process [7041]
INFO:     Uvicorn running on http://0.0.0.0:5000 (Press CTRL+C to quit)
INFO:     Waiting for application startup.
INFO:     ASGI 'lifespan' protocol appears unsupported.
INFO:     Application startup complete.
INFO:     127.0.0.1:44954 - "GET / HTTP/1.1" 200 OK
main process: 7041
PID 7060 has sleep time: 1
PID 7061 has sleep time: 10
terminating pid  7060
terminating pid  7061
exiting at:  10
INFO:     Shutting down
INFO:     Finished server process [7041]

With Flask, the behavior of an identical app is as expected.

main process: 1015
PID 1035 has run time: 1
PID 1039 has run time: 1
PID 1038 has run time: 10
terminating pid  1035
terminating pid  1038
terminating pid  1039
exiting at:  2

127.0.0.1 - - [09/Jan/2020 08:51:37] "POST /test-endpoint HTTP/1.1" 200 -

Environment

  • OS: [Ubuntu 18.04.1 LTS]
  • Uvicorn Version: 0.11.1
  • Python version: 3.6.8

Additional context

This came up while trying to port a WSGI application to FastAPI - link. On suggestion of @dmontagu, I tried to reproduce it with starlette and just uvicorn and saw that the error persists.

Hypercorn shows similar behavior in that the application shuts down after serving the first request. So, the issue likely has something to do with how async servers manage processes? Could you please point to where I might look to solve this?

Thank you for looking.

@ananis25
Copy link
Author

For more context, Uvicorn shows the same behavior in WSGI mode too.

  1. Create a file wsgi_repro.py
import os
import time
from concurrent.futures import ProcessPoolExecutor
from flask import Flask


app = Flask(__name__)

def simple_routine(sleep_for):
    print(f"PID {os.getpid()} has sleep time: {sleep_for}")
    time.sleep(sleep_for)
    return "done"

@app.route("/test-endpoint", methods=['GET', 'POST'])
def test_endpoint():
    print(f"main process: {os.getpid()}")

    START_TIME = time.time()
    STOP_TIME = START_TIME + 2
    with ProcessPoolExecutor(max_workers=2) as pool:
        futures = [
            pool.submit(simple_routine, 1), 
            pool.submit(simple_routine, 10), 
        ]

        results = []
        for fut in futures:
            remains = max(STOP_TIME - time.time(), 0)
            try:
                results.append(fut.result(timeout=remains))
            except:
                results.append("not done")

       # terminate the processes which are still running
        for pid, proc in pool._processes.items():
            print("terminating pid ", pid)
            proc.terminate()
    
    print("exiting at: ", int(time.time() - START_TIME))
    return "everything is good"
  1. Run using: uvicorn wsgi_repro:app --port 5000 --interface wsgi
  2. Make requests using
import requests
for _ in range(20):
    print(requests.get("http://localhost:5000/test-endpoint").text)

The Flask debug server in contrast behaves as expected.

@euri10
Copy link
Member

euri10 commented Jan 11, 2020

the below could be improved and maybe doesn't deal with all potential issues but this might be a good start for you.
the idea is the following, ProcessPoolExecutor submit returns a Future so we write a AsyncProcessPool that gives us the same method that would work on an event loop. For that it uses asyncio.wrap_future on the return value.

import asyncio
import os
import time
from concurrent.futures import ProcessPoolExecutor
from contextlib import asynccontextmanager
from multiprocessing import get_context

from fastapi import FastAPI

import uvicorn

CONTEXT = get_context("spawn")

app = FastAPI()


class AsyncProcessPool:
    def __init__(self, executor, loop=None, ):
        self.executor = executor
        if not loop:
            loop = asyncio.get_running_loop()
        self.loop = loop
        self.pending = []
        self.result = None

    def submit(self, fn, *args, **kwargs):
        fut = asyncio.wrap_future(self.executor.submit(fn, *args, **kwargs), loop=self.loop)
        self.pending.append(fut)
        return fut


@asynccontextmanager
async def pool(max_workers=None, mp_context=CONTEXT, initializer=None, initargs=(), loop=None, return_exceptions=True):
    with ProcessPoolExecutor(max_workers=max_workers, mp_context=mp_context,initializer=initializer, initargs=initargs) as executor:
        pool = AsyncProcessPool(executor, loop=loop)
        try:
            yield pool
        finally:
            pool.result = await asyncio.gather(*pool.pending, loop=pool.loop, return_exceptions=return_exceptions)


def simple_routine(sleep_for):
    print(f"PID {os.getpid()} has sleep time: {sleep_for}")
    time.sleep(sleep_for)
    return "done"


@app.get("/")
async def test_endpoint():
    print(f"main process: {os.getpid()}")

    START_TIME = time.time()
    async with pool(max_workers=2) as p:
        # futures = [
        #     p.submit(simple_routine, 1),
        #     p.submit(simple_routine, 3),
        # ]
        #
        # results = []
        # for fut in futures:
        #     try:
        #         results.append(fut.result(timeout=2))
        #     except:
        #         results.append("not done")
        await p.submit(simple_routine, 1)
        await p.submit(simple_routine, 3)
    print(p.result)
        # terminate the processes which are still running
        # for pid, proc in p._processes.items():
        #     print("terminating pid ", pid)
        #     proc.terminate()
    print("exiting at: ", int(time.time() - START_TIME))
    return True


if __name__ == '__main__':
    uvicorn.run("asgimp:app")

@ananis25
Copy link
Author

Thank you @euri10 for looking into it. I'm not really conversant with async python, so I couldn't really figure out how to put in a timeout for the child processes. Also, the routines relying on multiprocessing are generally a couple levels deep than the API method so I'm a bit hesitant to introducing asyncio.

That said, I don't quite see how running the Processpool executor in an asyncio loop would help. The underlying problem imo is that stopping python processes once started is generally tricky, and somehow the parent process is getting terminated, when running either of the async servers?

Would it be useful to reframe the question or maybe mark it as a bug for Uvicorn (it shuts down even when running in wsgi mode, which is undesirable)?

@euri10
Copy link
Member

euri10 commented Jan 13, 2020

I'm not sure (I may be totally off on this, so please apologize in advance if that's the case) that even your wsgi case should behave the way you think it should.

That said, I don't quite see how running the Processpool executor in an asyncio loop would help

running uvicorn puts you in an event loop by design, so should you want to run some blocking code (time.sleep or any other non-async cpu intensive tasks) then your issue is how to run multiple processes within that event loop.

usually it's about using 'loop.run_in_executor' but afaik there's no way to timeout your processes

one way to deal with that would be maybe to do something like below : an AsyncPoolExecutor that would wait up to global_timeout' for a list of tasks to complete with asyncio.wait`, with each task having it's own local timeout using the same wrap_future "trick" as above

other may have better ideas or find it's a bug :)

import asyncio
import os
import time
from concurrent.futures import ProcessPoolExecutor
from contextlib import asynccontextmanager
from multiprocessing import get_context

from fastapi import FastAPI

import uvicorn

CONTEXT = get_context("spawn")

app = FastAPI()


class AsyncProcessPool:
    def __init__(self, executor, loop=None, global_timeout=None):
        self.executor = executor
        if not loop:
            loop = asyncio.get_running_loop()
        self.loop = loop
        self.tasks = []
        self.result = None
        self.global_timeout = global_timeout

    def submit(
        self, fn, *args, **kwargs,
    ):
        timeout = kwargs.get("timeout")
        if timeout is not None:
            kwargs.pop("timeout")
        confut = asyncio.wrap_future(
            self.executor.submit(fn, *args, **kwargs), loop=self.loop
        )
        fut = asyncio.wait_for(confut, timeout=timeout, loop=self.loop)
        task = asyncio.create_task(fut)
        self.tasks.append(task)
        return task


@asynccontextmanager
async def pool(
    max_workers=None,
    mp_context=CONTEXT,
    initializer=None,
    initargs=(),
    loop=None,
    return_exceptions=True,
    global_timeout = None,
):
    with ProcessPoolExecutor(
        max_workers=max_workers,
        mp_context=mp_context,
        initializer=initializer,
        initargs=initargs,
    ) as executor:
        pool = AsyncProcessPool(executor, loop=loop, global_timeout=global_timeout)
        try:
            yield pool
            done, pending = await asyncio.wait(pool.tasks, timeout=pool.global_timeout)
        except Exception as e:
            print(f"exception: {e}")
        finally:
            print("finally")
            # print(f"done: {done}")
            # print(f"pending: {pending}")



def simple_routine(sleep_for):
    print(f"PID {os.getpid()} has sleep time: {sleep_for}")
    time.sleep(sleep_for)
    return "done"


@app.get("/")
async def test_endpoint():
    print(f"main process: {os.getpid()}")
    START_TIME = time.time()
    async with pool(max_workers=3, global_timeout=4) as p:
        p.submit(simple_routine, 1, timeout=2)  # should be ok
        p.submit(simple_routine, 5, timeout=6)  # should timeout because of global
        p.submit(simple_routine, 2, timeout=1)  # should timeout local
    print(p.tasks)
    results = []
    for task in p.tasks:
        if task.done():
            if task.exception():
                results.append(task.exception())
            else:
                results.append(task.result())
        else:
            results.append("global hit")

    print(results)
    print("exiting at: ", int(time.time() - START_TIME))
    return True


if __name__ == "__main__":
    uvicorn.run("asgimp:app")

@ananis25
Copy link
Author

ananis25 commented Jan 14, 2020

Thanks @euri10 . This snippet however still waits for the longest running process to actually finish before returning. When using the pool (ProcessPoolExecutor) from concurrent.futures module, I don't think there is a way around it besides:

  • Call pool.shutdown() (thx @dmontagu) - Child processes running beyond timeout are rendered zombies. So, they keep running until complete, though your web service can return immediately.

  • Call pool.shutdown() and manually kill child processes - This should take care of everything. I've had mixed success with this.

# works well with flask, not with asgi
children = {pid: child for pid, child in pool._processes.items()}
pool.shutdown(wait=False)
for pid, proc in children.items():
    proc.terminate()

# using the psutil library. Seems to work with both asgi and flask.
import psutil, signal
def kill_process(process_id):
    """kill the process specified by the given id"""
    try:
        process = psutil.Process(process_id)
        process.send_signal(signal.SIGKILL)
    except psutil.NoSuchProcess:
        return

children = [child for child in pool._processes]
pool.shutdown(wait=False)
for child_id in children:
    kill_process(child_id)

I thought it would be a bug since the wsgi servers behave as expected. Would you suggest I close this issue?

@euri10
Copy link
Member

euri10 commented Jan 14, 2020

I thought it would be a bug since the wsgi servers behave as expected. Would you suggest I close this issue?

no I'd wait for someone better on this to give you maybe better insights.

@ananis25
Copy link
Author

Sorry for getting back late on this. In summary,

  1. Using the psutil library to send a kill signal to subprocesses seems to be working everywhere, both flask and uvicorn servers - link.
  2. The terminate method provided by python stdlib's process pool executor doesn't work for uvicorn and terminates the server process itself.

This probably has more to do with how the system signal handling works rather than uvicorn. Please feel free to close this.

@gjthompson1
Copy link

gjthompson1 commented Oct 29, 2020

@ananis25 What platforms is this working on for you?

As an FYI:

  • I'm trying to get this to work on both mac and windows. Using spawn seems to work for me.
  • I'm also using PyInstaller. The python docs say spawn doesn't work with PyInstaller on unix but it seems to be working for me...

Just rolling the 🎲

@ananis25
Copy link
Author

Thank you for the link to the FastAPI issue, that explains the underlying issue. Using the psutil library to send the shutdown signal worked for me on both linux and mac machines back then.

@selimb
Copy link

selimb commented Jan 29, 2021

Stumbled upon this as well. I believe this is a Python bug, which I just reported: https://bugs.python.org/issue43064

The issue is that on linux:

  • uvicorn installs signal handlers with loop.add_signal_handler (even though it could use signal.signal, since it doesn't interact with the event loop in the signal handler)
  • the signal handler attempts to gracefully shutdown after receiving the signal once, but forcefully shuts down after receiving it twice or more.
  • forked processes "inherit" the signal handler
  • consequently, sending SIGINT/SIGTERM/Ctrl+C to the uvicorn process results in a forceful shutdown if any forked process is still running
  • so far this is actually expected behaviour. the problem is: overriding the uvicorn signal handler in forked processes is not possible with signal.signal, unless using the signal.SIG_DFL or signal.SIG_IGN built-in handlers -- which is not what I wanted.

My current workaround is to subclass uvicorn.Server, override install_signal_handlers and force it to use signal.signal, i.e.:

class UvicornServerPatchedSignalHandlers(uvicorn.Server):
    def install_signal_handlers(self) -> None:
         for sig in (signal.SIGINT, signal.SIGTERM):
             signal.signal(sig, self.handle_exit)

FYI, using spawn with PyInstaller unfortunately didn't work for me on CentOS.

Apologies for necro-ing, but I thought this would be useful to others.

@euri10
Copy link
Member

euri10 commented Jan 29, 2021

Still useful @selimb thanks.
That makes me think about this pyinstaller and multiprocessing weirdness one user recently faced, not sure it's linked, just linking it in case #939

@johnthagen
Copy link
Contributor

johnthagen commented Jun 3, 2021

Edit: This startup method has been deprecated. See the solution here instead:


For FastAPI usage, I solved this by setting multiprocessing to use the "spawn" method in FastAPI's startup event handler:

import multiprocessing

...
app = FastAPI()


@app.on_event("startup")
def startup_event() -> None:
    multiprocessing.set_start_method("spawn")

@Mixser
Copy link

Mixser commented Jun 15, 2022

Hi, @selimb there is an explanation what is happening and why

asyncio setups signal handler in a specific way -- it calls signal.set_wakeup_fd and passes an fd of the opened socket. After it, if any signals are sent to the process, they will be written to this socket/fd.

Any child process will inherit not only signal handlers' behavior but an opened socket. And as a result, when we are sending a signal to the child process, it will be written to the socket and the parent process will receive it too, even though this signal was sent not to him; Or if you will send it to the parent process, the child process will receive this signal too;

How you can avoid this behavior -- at the very beginning of the child process you can execute the following code

signal.set_wakeup_fd(-1) # don't send the signal into shared socket

signal.signal(signal.SIGTERM, signal.SIG_DFL) # reset signal handlers to default
signal.signal(signal.SIGINT, signal.SIG_DFL) # reset signal handlers to default

PS. I've downloaded an example from https://bugs.python.org/issue43064, and added signal.set_wakeup_fd(-1) to the first line of the worker_sync method. And as a result, I got an expected result (one call for main process, and three calls for child):

[4560] handling signal with asyncio
[4560] main
[4561] worker sleeping...
[4560] 3 procs still alive
[4563] worker sleeping...
[4562] worker sleeping...
[4560] 3 procs still alive
[4560] 3 procs still alive
^C[4563] handle_sig_worker (2, <frame at 0x1012d8220, file '/Users/mike/Work/fast-api-multiprocessing-problems/exp.py', line 77, code worker_sync>) {}
[4562] handle_sig_worker (2, <frame at 0x1012d8220, file '/Users/mike/Work/fast-api-multiprocessing-problems/exp.py', line 77, code worker_sync>) {}
[4561] handle_sig_worker (2, <frame at 0x1012c7d60, file '/Users/mike/Work/fast-api-multiprocessing-problems/exp.py', line 77, code worker_sync>) {}
[4560] handle_sig_main () {}
[4560] 3 procs still alive
[4560] 3 procs still alive
[4561] worker done
[4563] worker done
[4562] worker done
[4560] no procs alive
[4560] main done

@selimb
Copy link

selimb commented Jun 18, 2022

@Mixser Fascinating! Many thanks for the detailed explanation. I was able to replicate your results as well.
🙏

@johnthagen
Copy link
Contributor

johnthagen commented Nov 2, 2023

Since my previous solution (using @app.on_event("startup")) has been deprecated, here is an updated solution using the new Lifespan Events.

from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
import multiprocessing


@asynccontextmanager
async def lifespan(_: FastAPI) -> AsyncIterator[None]:
    """Startup/shutdown logic."""
    multiprocessing.set_start_method("spawn")
    yield


app = FastAPI(lifespan=lifespan)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants