Skip to content

Commit

Permalink
[local-worker-mgr] Force using 'spawn' for process creation (#307)
Browse files Browse the repository at this point in the history
  • Loading branch information
mtrofin committed Oct 18, 2023
1 parent 2f20686 commit 25c5254
Showing 1 changed file with 7 additions and 2 deletions.
9 changes: 7 additions & 2 deletions compiler_opt/distributed/local/local_worker_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ class TaskResult:
value: Any


def _get_context():
return multiprocessing.get_context('spawn')


SerializedClass = bytes


Expand All @@ -74,6 +78,7 @@ def _run_impl(pipe: connection.Connection, worker_class: SerializedClass, *args,
# jobs, this effectively limits the number of clang instances spawned.
pool = concurrent.futures.ThreadPoolExecutor(max_workers=1)
obj = cloudpickle.loads(worker_class)(*args, **kwargs)
obj = cloudpickle.loads(worker_class)(*args, **kwargs)

# Pipes are not thread safe
pipe_lock = threading.Lock()
Expand Down Expand Up @@ -122,15 +127,15 @@ class _Stub:
"""Client stub to a worker hosted by a process."""

def __init__(self):
parent_pipe, child_pipe = multiprocessing.get_context().Pipe()
parent_pipe, child_pipe = _get_context().Pipe()
self._pipe = parent_pipe
self._pipe_lock = threading.Lock()

# this is the process hosting one worker instance.
# we set aside 1 thread to coordinate running jobs, and the main thread
# to handle high priority requests. The expectation is that the user
# achieves concurrency through multiprocessing, not multithreading.
self._process = multiprocessing.get_context().Process(
self._process = _get_context().Process(
target=functools.partial(_run, child_pipe, cloudpickle.dumps(cls), *
args, **kwargs))
# lock for the msgid -> reply future map. The map will be set to None
Expand Down

0 comments on commit 25c5254

Please sign in to comment.