Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manager().Pool gets stuck with billiard, works with multiprocessing #340

Closed
mandarup opened this issue Sep 13, 2021 · 2 comments
Closed

Comments

@mandarup
Copy link

mandarup commented Sep 13, 2021

Functional example below works with multiprocessing but fails (gets stuck at pool.starmap) with billiard. Is there anything wrong with this example, does it need to be adapted to billiard somehow?

billiard==3.6.4.0
python3.9.0
MacOS Catalina

# import multiprocessing as mproc
import billiard as mproc


import os
import functools
import numpy as np
import random
import logging
import time

logger = logging.getLogger(__name__)


def child_proc(lock, task_id, shared_best_result):
   """A simple process that conditionally updates global shared variable
   """
    #time.sleep(1)
    
    # or dummy calc to stress cpu worker
    dummy_calc = np.random.rand(10000, 10000) * np.random.rand(10000, 10000)
    rnd = random.randint(1, 1000)
    print(task_id, rnd)
    with lock:
        if rnd < shared_best_result['best']:
            shared_best_result['best'] = rnd
            shared_best_result['task_id'] = task_id
            print(f'New solution found : {dict(shared_best_result)}')


def run_parallel(num_multiple_random_starts=5):
    with mproc.Manager() as manager:

        # create a variable shared across all threads
        shared_best_result = manager.dict()
        best_results_schema = {
            'task_id': None,
            'time': None,
            'best': np.inf,
        }
        for k,v in best_results_schema.items():
            shared_best_result[k] = v

        lock = manager.Lock()
        # functools partial will implicitly pass lock as first argument
        child_partial = functools.partial(child_proc, lock)

        with manager.Pool(processes=5) as pool:
            result = pool.starmap(child_partial, [(
                task_id,
                shared_best_result,
            ) for task_id in range(num_multiple_random_starts)])

            logger.info(shared_best_result)

        # convert Manager.dict to python's dict before exiting Manager context
        result = dict(shared_best_result)
    return result


if __name__ == '__main__':
    result = run_parallel()
    print(result)

Thanks for help!!

@mandarup mandarup changed the title processess get stuck Manager.Pool gets stuck with billiard, works with multiprocessing Sep 13, 2021
@mandarup mandarup changed the title Manager.Pool gets stuck with billiard, works with multiprocessing Manager().Pool gets stuck with billiard, works with multiprocessing Sep 13, 2021
@mandarup
Copy link
Author

it seems like the issue with passing lock to the child process

def child_proc_simple(task_id):
    #time.sleep(1)
    print(task_id)

def child_proc_simple_with_lock(lock, task_id):
    #time.sleep(1)
    print(task_id)

def simple():
    with mproc.Manager() as manager:

        # create a variable shared across all threads
        shared_best_result = manager.dict()
        best_results_schema = {
            'task_id': None,
            'time': None,
            'best': np.inf,
        }
        for k,v in best_results_schema.items():
            shared_best_result[k] = v

        lock = manager.Lock()
        
        with manager.Pool(processes=5) as pool:
            
            # this works:                                         <------- WORKS
            result = pool.starmap(child_proc_simple, [(
                 lock,
                 task_id,
            ) for task_id in range(10)])

            # this does not work, gets stuck                       <-------   FAILS
            result = pool.starmap(child_proc_simple_with_lock, [(
                lock,
                task_id,
            ) for task_id in range(10)])

            logger.info(shared_best_result)

        # convert Manager.dict to python's dict before exiting Manager context
        result = dict(shared_best_result)
    return result

@mandarup
Copy link
Author

Using processes directly seems to work.

with mproc.Manager() as manager:
        # create a variable shared across all threads
        shared_best_result = manager.dict()
        best_results_schema = {
            'task_id': None,
            'time': None,
            'best': 1e6,
        }
        for k, v in best_results_schema.items():
            shared_best_result[k] = v

        lock = manager.Lock()

        processes = [
            mproc.Process(target=child_proc,
                          args=(lock, task_id, shared_best_result,))
            for task_id in range(10)
        ]
        for process in processes:
            process.start()
        for process in processes:
            process.join()

this is a workaround, closing this for now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant