Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Question: multiprocessing, logging and progressbar #267

Closed
wabiloo opened this issue Feb 1, 2022 · 3 comments
Closed

Question: multiprocessing, logging and progressbar #267

wabiloo opened this issue Feb 1, 2022 · 3 comments
Labels

Comments

@wabiloo
Copy link

wabiloo commented Feb 1, 2022

Description

I am trying to work out a way of combining multiprocessing (with several process), logging and your progressbar.
The idea is that there will be multiple processes each working on separate tasks, and I'd like a progress bar that shows overall progress across all of them, whilst not preventing logs (and std out) from being shown on screen.

I am not managing however to find a way to get a nice progressbar at the bottom of the console, whilst all the processes send their logging information. I was hoping that when using a QueueHandler with a specific listener process, and a specific process to also receive "status" information and managing the progress bar, it would work, but it does not...

I started from the example given at https://docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes and modified it into the following code.

It's fair to say the question is probably not strictly related to the use of your progressbar, but I'm wondering if you'd have any suggestion...

import logging
import logging.handlers
import multiprocessing

from random import choice, random
import time

import progressbar
progressbar.streams.wrap_stderr()
progressbar.streams.wrap_stdout()

def listener_configurer():
    root = logging.getLogger()
    # h = logging.handlers.RotatingFileHandler('mptest.log', 'a', 300, 3)
    # f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s')
    # h.setFormatter(f)
    # root.addHandler(h)

def listener_process(queue, configurer):
    configurer()
    while True:
        try:
            record = queue.get()
            if record is None:  # We send this as a sentinel to tell the listener to quit.
                break
            logger = logging.getLogger(record.name)
            logger.handle(record)  # No level or filter logic applied - just do it!
        except Exception:
            import sys, traceback
            print('Whoops! Problem:', file=sys.stderr)
            traceback.print_exc(file=sys.stderr)

# Arrays used for random selections in this demo

LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING,
          logging.ERROR, logging.CRITICAL]

LOGGERS = ['a.b.c', 'd.e.f']

MESSAGES = [
    'Random message #1',
    'Random message #2',
    'Random message #3',
]

def worker_configurer(queue):
    h = logging.handlers.QueueHandler(queue)  # Just the one handler needed
    root = logging.getLogger()
    root.addHandler(h)
    # send all messages, for demo; no other level or filter logic applied.
    root.setLevel(logging.DEBUG)

def worker_process(queue, configurer, status_queue):
    configurer(queue)
    name = multiprocessing.current_process().name
    # print('Worker started: %s' % name)

    for i in range(10):
        time.sleep(random())
        logger = logging.getLogger(choice(LOGGERS))
        level = choice(LEVELS)
        message = choice(MESSAGES)
        logger.log(level, message)

        status_queue.put(i)
    # print('Worker finished: %s' % name)

def status_updater_process(queue):
    cpt = 0
    with progressbar.ProgressBar(max_value=progressbar.UnknownLength) as bar:

        while True:
            next = queue.get()
            if next is None:  # We send this as a sentinel to tell the listener to quit.
                break

            cpt += 1
            bar.update(cpt)

def main():
    status_queue = multiprocessing.Queue()
    status_worker = multiprocessing.Process(target=status_updater_process,
                                            args=(status_queue, ))
    status_worker.start()

    queue = multiprocessing.Queue(-1)
    listener = multiprocessing.Process(target=listener_process,
                                       args=(queue, listener_configurer))
    listener.start()
    workers = []
    for i in range(10):
        worker = multiprocessing.Process(target=worker_process,
                                         args=(queue, worker_configurer, status_queue))
        workers.append(worker)
        worker.start()
    for w in workers:
        w.join()
    queue.put_nowait(None)
    status_queue.put_nowait(None)
    listener.join()
    status_worker.join()

if __name__ == '__main__':
    main()

@wolph
Copy link
Owner

wolph commented Feb 2, 2022

I'm guessing that the issue is that you are using separate processes for the logging and the progress bar. The progressbar only redirects the stderr output while it's running and it cannot access the stderr from another process.

So, if we combine the status_update_process with the listener_process, it should work. To do this we have a few options:

  1. Send more advanced messages through the queue which contain either a progressbar update or a log message
  2. Listen to both queues simultaneously by swapping between them

I think option 1 is the best, so let's give that a try :)

import dataclasses
import progressbar
progressbar.streams.wrap_stderr()
progressbar.streams.wrap_stdout()

import logging
import logging.handlers
import multiprocessing

from random import choice, random
import time


@dataclasses.dataclass
class ProgressBarUpdate:
    status: int


def listener_configurer():
    root = logging.getLogger()
    # h = logging.handlers.RotatingFileHandler('mptest.log', 'a', 300, 3)
    # f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s')
    # h.setFormatter(f)
    # root.addHandler(h)

def listener_process(queue, configurer):
    configurer()
    with progressbar.ProgressBar(max_value=progressbar.UnknownLength) as bar:
        while True:
            try:
                record = queue.get()
                if record is None:  # We send this as a sentinel to tell the listener to quit.
                    break

                if isinstance(record, ProgressBarUpdate):
                    bar.update()
                    continue
                else:
                    logger = logging.getLogger(record.name)
                    logger.handle(record)  # No level or filter logic applied - just do it!
            except Exception:
                import sys, traceback
                print('Whoops! Problem:', file=sys.stderr)
                traceback.print_exc(file=sys.stderr)

# Arrays used for random selections in this demo

LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING,
          logging.ERROR, logging.CRITICAL]

LOGGERS = ['a.b.c', 'd.e.f']

MESSAGES = [
    'Random message #1',
    'Random message #2',
    'Random message #3',
]

def worker_configurer(queue):
    h = logging.handlers.QueueHandler(queue)  # Just the one handler needed
    root = logging.getLogger()
    root.addHandler(h)
    # send all messages, for demo; no other level or filter logic applied.
    root.setLevel(logging.DEBUG)

def worker_process(queue, configurer):
    configurer(queue)
    name = multiprocessing.current_process().name
    # print('Worker started: %s' % name)

    for i in range(10):
        time.sleep(random())
        logger = logging.getLogger(choice(LOGGERS))
        level = choice(LEVELS)
        message = choice(MESSAGES)
        logger.log(level, message)

        queue.put(ProgressBarUpdate(i))
    # print('Worker finished: %s' % name)

def main():
    queue = multiprocessing.Queue(-1)
    listener = multiprocessing.Process(target=listener_process,
                                       args=(queue, listener_configurer))
    listener.start()
    workers = []
    for i in range(10):
        worker = multiprocessing.Process(target=worker_process,
                                         args=(queue, worker_configurer))
        workers.append(worker)
        worker.start()
    for w in workers:
        w.join()
    queue.put_nowait(None)
    status_queue.put_nowait(None)
    listener.join()
    status_worker.join()

if __name__ == '__main__':
    main()

@wabiloo
Copy link
Author

wabiloo commented Feb 3, 2022

Your explanation makes sense, and I had actually thought of doing something like that.
It doesn't seem "clean" though: 1 process in charge of 2 radically different things - goes against the separation of concerns a bit...

Still, I think it's got legs and I will try and implement it in the real project.
Thansls for your help!

@wolph
Copy link
Owner

wolph commented Feb 3, 2022

Think of it this way, you'll have a single process responsible for all user interaction.

That is actually how most GUI libraries work as well., a single process for all user interaction which should always remain responsive, and background processes for taking care of all heavy and/or blocking tasks.

You could still split the output process into multiple threads or asyncio tasks of course. As long as they can share the same stderr stream

@github-actions github-actions bot added the Stale label Aug 21, 2023
@github-actions github-actions bot closed this as not planned Won't fix, can't repro, duplicate, stale Aug 28, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants