dask.bag: allow printing result to stdout #1066

Closed
Jeffrey04 opened this Issue Mar 29, 2016 · 5 comments

Comments

Projects
None yet
3 participants
@Jeffrey04

Suppose my use case is this, read input from stdin, process it, then output to stdout (so that it can be piped into another process). However this cannot be done easily as of now. I have cheated with .map_partitions(lambda partition: [print(item) for item in partition]), but due to the highly concurrent nature of dask it broke print() as it is not atomic (getting incomplete lines sometimes).

I then hacked the dask.multiprocessing.get to make the manager a global option

    manager = _globals['manager'] or multiprocessing.Manager()

and then use dask.set_options() to replace the manager, so that I can write a printer to print from a queue instead of letting all N processes to print concurrently, as follows

def output_printer(queue):
    while True:
        msg = queue.get()

        if msg == '_DONE':
            break
        else:
            print(json.dumps(msg))

def queue_send(partition, print_queue=None):
    for item in partition:
        print_queue.put(item)

manager = Manager()
dask.set_options(manager=manager)
print_queue = manager.Queue()
printer = Process(target=output_printer, args=(print_queue, ))
printer.daemon = True
printer.start()

bag = db.from_sequence(sys.stdin, 500) \
  .map(json.loads) \
  .map_partitions(process_the_jsonlines) \
  .map_partitions(queue_send, print_queue=queue) \
  .compute()

print_queue.put('_DONE')
printer.join()

However, is there a better way to do this? It would be nice to be able to print the result right after it is computed (to_stdout() ?), instead of waiting until the whole collection is done processing. I don't mind if it is not in the right order.

@jcrist

This comment has been minimized.

Show comment
Hide comment
@jcrist

jcrist Mar 29, 2016

Member

The best way would probably be to fork off a process to do the actual printing with one end of a queue, and have the printing task put the results in the queue to be printed. This should limit the effects of synchronization, which using a lock causes. Assuming the process doing the printing can keep up with the workers creating the data this shouldn't flood your RAM either.

A cheap option though is just to use a manager to create a lock. The following works for me:

from multiprocessing import Manager
import dask.bag as db

manager = Manager()
lock = manager.Lock()
data = db.range(1000, npartitions=10)

def to_stdout(data, lock=lock):
    lock.acquire()
    for row in data:
        print(row)
    lock.release()

data.map_partitions(to_stdout).compute()

Since grabbing a lock is mildly expensive, we do it once for each chunk rather than once for each record. If this is fast enough for you, then this is the solution I'd recommend.

Member

jcrist commented Mar 29, 2016

The best way would probably be to fork off a process to do the actual printing with one end of a queue, and have the printing task put the results in the queue to be printed. This should limit the effects of synchronization, which using a lock causes. Assuming the process doing the printing can keep up with the workers creating the data this shouldn't flood your RAM either.

A cheap option though is just to use a manager to create a lock. The following works for me:

from multiprocessing import Manager
import dask.bag as db

manager = Manager()
lock = manager.Lock()
data = db.range(1000, npartitions=10)

def to_stdout(data, lock=lock):
    lock.acquire()
    for row in data:
        print(row)
    lock.release()

data.map_partitions(to_stdout).compute()

Since grabbing a lock is mildly expensive, we do it once for each chunk rather than once for each record. If this is fast enough for you, then this is the solution I'd recommend.

@Jeffrey04

This comment has been minimized.

Show comment
Hide comment
@Jeffrey04

Jeffrey04 Mar 30, 2016

I notice in dask.multiprocessing.get there is these lines

manager = multiprocessing.Manager()
queue = manager.Queue()

just out of curiousity, so it is ok to instantiate another multiprocessing.Manager() outside the function? Anyway the code snippet works (:

I notice in dask.multiprocessing.get there is these lines

manager = multiprocessing.Manager()
queue = manager.Queue()

just out of curiousity, so it is ok to instantiate another multiprocessing.Manager() outside the function? Anyway the code snippet works (:

@jcrist

This comment has been minimized.

Show comment
Hide comment
@jcrist

jcrist Mar 30, 2016

Member

Yes. In general the multiprocessing scheduler (dask.multiprocessing.get) works with any functions/arguments that can be serialized using pickle (well, cloudpickle actually) and sent between processes. Normal locks don't support this, but locks created by a manager do (https://docs.python.org/2/library/multiprocessing.html#managers). We use a manager inside the scheduler for its internal queue, but you can definitely create another manager for use in your own code.

Member

jcrist commented Mar 30, 2016

Yes. In general the multiprocessing scheduler (dask.multiprocessing.get) works with any functions/arguments that can be serialized using pickle (well, cloudpickle actually) and sent between processes. Normal locks don't support this, but locks created by a manager do (https://docs.python.org/2/library/multiprocessing.html#managers). We use a manager inside the scheduler for its internal queue, but you can definitely create another manager for use in your own code.

@mrocklin

This comment has been minimized.

Show comment
Hide comment
@mrocklin

mrocklin Mar 30, 2016

Member

I often use the locket library for file-based locks. They're pretty robust and cross platform.

Member

mrocklin commented Mar 30, 2016

I often use the locket library for file-based locks. They're pretty robust and cross platform.

@Jeffrey04

This comment has been minimized.

Show comment
Hide comment
@Jeffrey04

Jeffrey04 Mar 30, 2016

@jcrist yea, I knew locks/queues needs to be created by multiprocessing.Manager, just that not sure if it is a good idea to create another one for that purpose. thanks (:

@jcrist yea, I knew locks/queues needs to be created by multiprocessing.Manager, just that not sure if it is a good idea to create another one for that purpose. thanks (:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment