In [2]:
3

3

In [3]:
!pip install loky

  Using cached loky-2.9.0-py2.py3-none-any.whl (67 kB)
Collecting cloudpickle
  Downloading cloudpickle-1.6.0-py3-none-any.whl (23 kB)
Installing collected packages: cloudpickle, loky
Successfully installed cloudpickle-1.6.0 loky-2.9.0

You should consider upgrading via the 'c:\python39\python.exe -m pip install --upgrade pip' command.


In [4]:
import loky

In [5]:
from loky import get_reusable_executor
excutor = get_reusable_executor(max_workers=4)
print(excutor.executor_id)

0


In [6]:
excutor = get_reusable_executor(max_workers=4)
print(excutor.executor_id)

0


In [7]:
excutor = get_reusable_executor(max_workers=4)
print(excutor.executor_id)

0


In [8]:
excutor.submit(id, 42).result()

1941407100496

In [9]:
excutor.submit(id, 42).result()

1634479992400

In [10]:
excutor.submit(id, 42).result()

1941407100496

In [11]:
excutor.submit(id, 42).result()

1634479992400

In [13]:
from time import sleep
import multiprocessing as mp
import loky
from loky import get_reusable_executor

# Store the initialization status in a global variable of a module.
loky._INITIALIZER_STATUS = "uninitialized"


def initializer(x):
    print("[{}] init".format(mp.current_process().name))
    loky._INITIALIZER_STATUS = x


def return_initializer_status(delay=0):
    sleep(delay)

    return getattr(loky, '_INITIALIZER_STATUS', 'uninitialized')


executor = get_reusable_executor(
    max_workers=2, initializer=initializer, initargs=('initialized',),
    context="loky", timeout=1000)

assert loky._INITIALIZER_STATUS == "uninitialized"
print(executor.submit(return_initializer_status).result())
assert executor.submit(return_initializer_status).result() == 'initialized'

# With reuse=True, the executor use the same initializer
executor = get_reusable_executor(max_workers=4, reuse=True)
for x in executor.map(return_initializer_status, [.5] * 4):
    assert x == 'initialized'

# With reuse='auto', the initializer is not used anymore as a new executor
# is created.
executor = get_reusable_executor(max_workers=4)
for x in executor.map(return_initializer_status, [.1] * 4):
    assert x == 'uninitialized'

initialized


In [15]:
import os
from loky import get_reusable_executor


def func_async(i):
    import os
    pid = os.getpid()
    return (2 * i, pid)


def test_1():
    executor = get_reusable_executor(max_workers=1)
    return executor.submit(func_async, 1)


def test_2():
    executor = get_reusable_executor(max_workers=1)
    return executor.submit(func_async, 2)


def test_3():
    executor = get_reusable_executor(max_workers=1)
    return executor.submit(func_async, 3)


f1 = test_1()
f2 = test_2()
f3 = test_3()

main_pid = os.getpid()
results = [f1.result(), f2.result(), f3.result()]

pids = [pid for _, pid in results]

for i, (val, pid) in enumerate(results):
    assert val == 2 * (i + 1)
    assert pid != main_pid
    print(pid)
print("All the jobs were run in a process different from main process")

assert len(set(pids)) == 1
print("All the computation where run in a single `ProcessPoolExecutor` with "
      "worker pid={}".format(pids[0]))

9280
9280
9280
All the jobs were run in a process different from main process
All the computation where run in a single `ProcessPoolExecutor` with worker pid=9280


In [16]:
import sys
import time
import traceback
from loky import set_loky_pickler
from loky import get_reusable_executor
from loky import wrap_non_picklable_objects

###############################################################################
# First, define functions which cannot be pickled with the standard ``pickle``
# protocol. They cannot be serialized with ``pickle`` because they are defined
# in the ``__main__`` module. They can however be serialized with
# ``cloudpickle``.
#


def func_async(i, *args):
    return 2 * i


###############################################################################
# With the default behavior, ``loky`` is to use ``cloudpickle`` to serialize
# the objects that are sent to the workers.
#

executor = get_reusable_executor(max_workers=1)
print(executor.submit(func_async, 21).result())


###############################################################################
# For most use-cases, using ``cloudpickle``` is efficient enough. However, this
# solution can be very slow to serialize large python objects, such as dict or
# list, compared to the standard ``pickle`` serialization.
#

# We have to pass an extra argument with a large list (or another large python
# object).
large_list = list(range(1000000))

t_start = time.time()
executor = get_reusable_executor(max_workers=1)
executor.submit(func_async, 21, large_list).result()
print("With cloudpickle serialization: {:.3f}s".format(time.time() - t_start))


###############################################################################
# To mitigate this, it is possible to fully rely on ``pickle`` to serialize
# all communications between the main process and the workers. This can be done
# with an environment variable ``LOKY_PICKLER=pickle`` set before the
# script is launched, or with the switch ``set_loky_pickler`` provided in the
# ``loky`` API.
#

# Now set the `loky_pickler` to use the pickle serialization from stdlib. Here,
# we do not pass the desired function ``call_function`` as it is not picklable
# but it is replaced by ``id`` for demonstration purposes.
set_loky_pickler('pickle')
t_start = time.time()
executor = get_reusable_executor(max_workers=1)
executor.submit(id, large_list).result()
print("With pickle serialization: {:.3f}s".format(time.time() - t_start))


###############################################################################
# However, the function and objects defined in ``__main__`` are not
# serializable anymore using ``pickle`` and it is not possible to call
# ``func_async`` using this pickler.
#

try:
    executor = get_reusable_executor(max_workers=1)
    executor.submit(func_async, 21, large_list).result()
except Exception:
    traceback.print_exc(file=sys.stdout)


###############################################################################
# ``loky`` provides a wrapper function
# :func:`wrap_non_picklable_objects` to wrap the non-picklable function and
# indicate to the serialization process that this specific function should be
# serialized using ``cloudpickle``. This changes the serialization behavior
# only for this function and keeps using ``pickle`` for all other objects. The
# drawback of this solution is that it modifies the object. This should not
# cause many issues with functions but can have side effects with object
# instances.
#

@wrap_non_picklable_objects
def func_async_wrapped(i, *args):
    return 2 * i


t_start = time.time()
executor = get_reusable_executor(max_workers=1)
executor.submit(func_async_wrapped, 21, large_list).result()
print("With default and wrapper: {:.3f}s".format(time.time() - t_start))


###############################################################################
# The same wrapper can also be used for non-picklable classes. Note that the
# side effects of :func:`wrap_non_picklable_objects` on objects can break magic
# methods such as ``__add__`` and can mess up the ``isinstance`` and
# ``issubclass`` functions. Some improvements will be considered if use-cases
# are reported.
#

42
With cloudpickle serialization: 0.118s
With pickle serialization: 0.120s
loky.process_executor._RemoteTraceback: 
"""
Traceback (most recent call last):
  File "C:\Python39\lib\site-packages\loky\process_executor.py", line 404, in _process_worker
    call_item = call_queue.get(block=True, timeout=timeout)
  File "C:\Python39\lib\multiprocessing\queues.py", line 122, in get
    return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'func_async' on <module '__main__' (built-in)>
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "<ipython-input-16-85316087e36b>", line 71, in <module>
    executor.submit(func_async, 21, large_list).result()
  File "C:\Python39\lib\concurrent\futures\_base.py", line 440, in result
    return self.__get_result()
  File "C:\Python39\lib\concurrent\futures\_base.py", line 389, in __get_result
    raise self._exception
loky.process_executor.BrokenProcessPool: A task has failed 