# Keeping an Expensive Process in Background
Some functions are expensive because they launch an external process that has a long startup cost. 
If you can modify that external code to run more than one input before exiting (not all can), then you can follow this example to keep it alive between function calls.

In [1]:
from subprocess import Popen, PIPE
from time import sleep
import socket
import json
import sys

## A Primer: Pipes
Processes can talk over "pipes" where one writes to an end of the pipe and that a second can read from. 
Our first task is to show how to create these pipes and send data over them.

We are going to emulate them with sockets for example purposes.

Make a "pipe" that acts like a file object

In [2]:
read_socket, write_socket = socket.socketpair()
read_pipe = read_socket.makefile()
write_pipe = write_socket.makefile(mode='w')

We can write to one end and then read a message from the other

In [3]:
print(f'Hello!', file=write_pipe, flush=True)  # Flush ensures the message is sent

In [4]:
read_pipe.readline()

'Hello!\n'

Now that we have these, let's build the sending function. All it will do is package the data to be sent in a string message (we encode it as JSON as many languages support it)

In [5]:
def send_inputs(pipe, *args, **kwargs): 
    """Send the inputs to the subprocess
    
    Args:
        pipe: Pipe over which to send inputs
    """
        
    msg = json.dumps({'args': args, 'kwargs': kwargs})
    print(msg, file=pipe, flush=True)

In [6]:
send_inputs(write_pipe, 1, test='hello!')

We can read it out of the other side of the pipe and decode it using JSON

In [7]:
msg = read_pipe.readline()
json.loads(msg[:-1])

{'args': [1], 'kwargs': {'test': 'hello!'}}

The problem with pipes is that the `readline` call will block indefinitely. 
That means the external program will never exit if no new inputs are coming.

We will accomplish that by having the external code exiting after a timeout expires. The `readline` function does not have a timeout, so we use ["selectors"](https://docs.python.org/3/library/selectors.html) to impose a timeout.

> The details of this are specific to Python, so you may need to find a different mechanism for your external code's language.

In [8]:
from selectors import DefaultSelector, EVENT_READ

In [9]:
def read_inputs(pipe, timeout: float = 15) -> dict:
    """Read inputs from the pipe

    Args:
        timeout: Timeout in seconds
    Yields:
        Input arguments or ``None`` if there are no more inputs
    """

    # We use a Selector to wait for inputs from stdin
    #  See: https://docs.python.org/3/library/selectors.html#selectors.DefaultSelector
    sel = DefaultSelector()
    sel.register(pipe, EVENT_READ)  # Wait to read from stdin

    # Blocks until pipe is ready to read or a timeout has passed
    events = sel.select(timeout=timeout)

    # Break if pipe is not ready, which appears as an empty "events"
    if len(events) == 0:
        return

    # If the file is closed, then it will always read a blank message. Exit if this happens
    msg = pipe.readline()
    if len(msg) == 0:
        return

    # Yield the path
    return json.loads(msg[:-1])  # Last character is a newline

In [10]:
%%time
read_inputs(read_pipe, timeout=1)

CPU times: user 2.99 ms, sys: 684 µs, total: 3.68 ms
Wall time: 1 s


Note how this times out after one second

In [11]:
send_inputs(write_pipe, 1, 2)

In [12]:
read_inputs(read_pipe)

{'args': [1, 2], 'kwargs': {}}

Great! You now have the ability to send data between a remote process and your Python function over a pipe that can stay persistent over calls.

In [13]:
read_socket.close()
write_socket.close()

## Keeping Process and Pipes Persistent
The next ingredient is to keep these pipes and the process they communicate with alive between calls to a function. 
We can use the [strategy of keeping them as module-level globals discussed in our other example](../global-state/README.md).

Our first step is to create the code for the external process. We write it in Python and use a version of the `read_inputs` above modified to be an infinite iterator of new inputs. See [`executable.py`](./executable.py) for full details.

We launch that process using Python's [`subprocess` module] and have a few important options to set, described in the next cell.

In [14]:
def launch_subprocess(stderr_path: str = 'subprocess.err') -> Popen:
    """Launch the subprocess
    
    Args:
        stderr_path: Path where we should write the standard error
    Returns:
        A link to the subprocess
    """

    subprocess_stderr = open(stderr_path, 'w')  # A path for the subprocess to write error messages
    subprocess = Popen(
        args=[
            sys.executable,  # Uses the same Python as this notebook, but could be anything
            'executable.py'  # It is better to use absolute paths, but I know the script is in the same directory as this notebook
        ],
        stdin=PIPE,  # PIPE lets me write to and read from the subprocess
        stdout=PIPE,
        stderr=subprocess_stderr,
        text=True  # Critical because I'm sending paths as strings and not bytes
    )
    
    # Attach the stderr information for safe keeping
    subprocess.stderr = subprocess_stderr  
    subprocess.stderr_path = stderr_path
    return subprocess
subprocess = launch_subprocess()

We create the "subprocess" object a Pipe that we can send inputs to and a second that we can read results from it
These pipes are the standard in and output for the process.

In [15]:
send_inputs(subprocess.stdin, 1)
assert subprocess.poll() is None, "Subprocess died"

In [16]:
subprocess.stdout.readline()

'2\n'

In [17]:
subprocess.stderr.close()

It should close itself if we wait a few seconds

In [18]:
sleep(6)  # I know it times out after 5 seconds
assert subprocess.poll() == 0, f"Process did not exit cleanly. Return code={subprocess.poll()}"
subprocess.stderr.close()  # Close the file pointer

The logic for caching this function is going to be a bit more complicated than our last example. 
Instead of just seeing if subprocess is launched, we have to see if it is launched _and alive_. 

In [19]:
_subprocess: Popen = None
def remote_function(x: float) -> float:
    """The warmable function that actually runs an external process
    
    Args:
        x: Input value
    Returns:
        Output value
    """
    global _subprocess
    
    # If the subprocess doesn't exist yet, make one
    if _subprocess is None:
        _subprocess = launch_subprocess()
    elif _subprocess.poll() is not None:
        _subprocess.stderr.close()  # Close the old standard error
        _subprocess = launch_subprocess()  # Make a new one
    
    # Send the inputs to this subprocess
    send_inputs(_subprocess.stdin, x)
    
    # Wait to get the outputs
    output = _subprocess.stdout.readline()
    
    # If the output is blank, the code crashed
    if len(output) == 0: 
        # Close the output, then read the error messages it returned
        assert _subprocess.poll() is not None, "Process is still running, but errored."
        _subprocess.stderr.close()
        with open(_subprocess.stderr_path) as fp:
            raise ValueError(f'External Process Failed:\n\n{fp.read()}')
    else: 
        return json.loads(output[:-1])  # Last character is a new-line

The first time we run this function will have a large startup cost.

In [20]:
%%time
remote_function(1.)

CPU times: user 1.49 ms, sys: 448 µs, total: 1.94 ms
Wall time: 5.02 s


2.0

The second time will be fast as the program is ready-to-go

In [21]:
%%time
remote_function(1.)

CPU times: user 0 ns, sys: 713 µs, total: 713 µs
Wall time: 545 µs


2.0

If we wait a few seconds, the external process will timeout and require restarting

In [22]:
sleep(8)

In [23]:
%%time
remote_function(1.)

CPU times: user 1.9 ms, sys: 2.24 ms, total: 4.14 ms
Wall time: 5.02 s


2.0

For completeness, our function includes code to report errors from the external process

In [24]:
try:
    remote_function('bad')
except ValueError as exc:
    print(exc)

External Process Failed:

Traceback (most recent call last):
  File "executable.py", line 63, in <module>
    y = function(*inputs['args'], **inputs['kwargs'])
  File "executable.py", line 49, in function
    return x + 1
TypeError: can only concatenate str (not "int") to str



But, it requires the code to be restarted

In [25]:
%%time
remote_function(1.)

CPU times: user 0 ns, sys: 2.8 ms, total: 2.8 ms
Wall time: 5.02 s


2.0

## Advanced
A few things to consider as you learn to use this technique:

- *Avoid sending large data over pipes.* Write to the filesystem instead and send the path of where to find inputs to the external process.
- *Redirect stdout for noisy executable.* Some programs print helpful messages to stdout, which might be interpreted as the result by our calling function. A few options are to [redirect the stdout](https://docs.python.org/3/library/contextlib.html#contextlib.redirect_stdout) to [devnull](https://linuxhint.com/what_is_dev_null/) or a local file, appending a special flag to the stdout line that contains the answer, or communicating over special-purpose sockets.
- *Run each executable in a temporary directory.* Many codes write files to their local directory, use a [temporary directory](https://docs.python.org/3/library/tempfile.html#tempfile.mkdtemp) and the `cwd` option of `Popen` to prevent interference between copies of a function.
- *Returning errors via stdout.* Our above example exits when an error is kept, which causes performance issues. Instead of crashing, have your executable return error messages via stdout along with an indicator of whether the output was successful or not.
- *Beware stdin/stdout in parallel codes.* Only print an results or read the inputs from a single rank of a parallel program.