Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async progress bar #6

Closed
axelande opened this issue May 12, 2021 · 5 comments
Closed

Async progress bar #6

axelande opened this issue May 12, 2021 · 5 comments

Comments

@axelande
Copy link

Thanks for a nice lib, it would be really nice if it was possible to async the set progress during a long python calculation, is this possible or is this all ready possible today and I missed how to implement it correctly?

@redhog
Copy link
Member

redhog commented May 13, 2021

It is possible already!

I didn't include the background task code itself in the library yet, as it's a bit rough around the edges still and therefore still part of our in house app, not the library.

That said, I'm pasting it here as-is (licensed as the rest of the library) so that you can adapt it to your code base. Just note that something along these lines will be included in the library in the future, but possibly with a slightly different API:

import threading
import traceback
import tornado.gen
import logging
import bokeh_garden.logging_handler

class BackgroundTask(threading.Thread):
    def __init__(self, app, **kw):
        self.logger = logging.getLogger("%s.%s" % (type(self).__module__, type(self).__name__))
        self.app = app
        self.kw = kw
        threading.Thread.__init__(self)
        for progress_bars in self.app.doc.select({"tags": ["task_progress_bars"]}):
            self.bar= progress_bars.add_bar()
        self.start()

    def run(self):
        bokeh_garden.logging_handler.LoggingHandler.set_current_document(self.app.doc)
        kw_log = {}
        for key, value in self.kw.items():
            kw_log[key] = str(value)[:30]
        self.logger.info('Running task : %s,', repr(kw_log))

        self.set_status(0,type(self).__name__)
        try:
            self.result = self.task(**self.kw)
            self.app.save_session()
        except Exception as e:
            self.set_failed(e, traceback.format_exc())
        else:
            self.app.doc.add_next_tick_callback(tornado.gen.coroutine(self.done_wrapper))

    def _set_status(self, percent_done, status_text = None):
        self.error_exc = None
        self.error_tb = None
        self.percent_done = percent_done
        self.status_text = status_text

    def _set_failed(self, e, tb):
        self.error_exc = e
        self.error_tb = tb
        self.logger.error('%s:%s', e, tb)
        print(e)
        print(tb)
        
    def status_wrapper(self):
        self.bar.set(self.percent_done, self.status_text)
        self.status(self.percent_done, self.status_text, **self.kw)
        
    def fail_wrapper(self):
        self.bar.fail()
        self.fail(self.error_exc, self.error_tb, **self.kw)

    def done_wrapper(self):
        self.logger.info('Task done')
        try:
            self.done(**self.kw)
        except Exception as e:
            self._set_failed(e, traceback.format_exc())
            self.fail_wrapper()
        else:
            self._set_status(100)
            self.status_wrapper()


    # Public API:
    # Subclasses must override task() and can override the rest of these methods.
    # Task runs in the background, all other in the UI foreground thread.
    def task(self, **kw): pass
    def status(self, percent_done, status, **kw): pass
    def fail(self, exc, tb, **kw): pass
    def done(self, result, **kw): pass

    # taks() can call any of these methods to signal its status. Not
    # that set_status() clears any previous failure set by failed().
    
    def set_status(self, percent_done, status_text = None):
        self._set_status(percent_done, status_text)
        self.app.doc.add_next_tick_callback(tornado.gen.coroutine(self.status_wrapper))

    def set_failed(self, e, tb):
        self._set_failed(e, tb)
        self.app.doc.add_next_tick_callback(tornado.gen.coroutine(self.fail_wrapper))

Example task using the above:

class RunSlowCalculation(BackgroundTask):
    def task(self, **kw):
        # Note: No calls to any bokeh ui stuff here! This is run in the background...
        self.set_status(0, 'First')
        do_first_subtask(**kw)
        self.set_status(30, 'Second')
        do_second_subtask(**kw)
        self.set_status(60, 'Third')
        do_third_subtask(**kw)
        self.set_status(99, 'Stuff done')

    def done(self, **kw):
        something_updating_ui(self.app)

@axelande
Copy link
Author

Thanks for a very nice reply, I had a quick look at it and I might miss something obvious but I do not get how to implement your suggestion in a tiny example like this (which I run with python - m bokeh serve file_name.py):

import time
from bokeh.io import curdoc
from bokeh.layouts import row, layout
from bokeh.models.widgets import Button
from bokeh_garden.progress_bar import ProgressBar
from bokeh_garden.application import AppWidget


class ExampleLayout:
    def __init__(self):
        self.button = self._get_button()
        self.progressbar = self._get_progress_bar()

    def _get_button(self) -> Button:
        button = Button(label='Click to start the progressbar')
        button.on_click(self.start_progressbar)
        return button

    def _get_progress_bar(self):
        progressbar = ProgressBar(AppWidget)
        return progressbar

    def start_progressbar(self):
        for i in range(11):
            self.progressbar.set(i * 10)
            time.sleep(1)


el = ExampleLayout()
lay = layout([row(el.button),
              row(el.progressbar)])
curdoc().add_root(lay)
curdoc().title = "Tiny app"

As the script runs now, when I click the button, nothing happens for 10 seconds then the progress bar gets complete at once. How can I include the BackgroundTask in this example and get a 10% increase every second?

@redhog
Copy link
Member

redhog commented May 14, 2021

So... the problem w your code is that you're busy-waiting in the start_progressbar() function, never releasing control back to Bokeh. You have two choices:

Run the code in a thread (like my BackgroundTask), or break it up into multiple smaller functions, and use curdoc().add_next_tick_callback() to call the next one.

@redhog
Copy link
Member

redhog commented May 14, 2021

For real long-running work, the next_tick_callback approach becomes really impractical, and I strongly suggest using threads. Note that background threads can not generally call bokeh methods, really only add_next_tick_callback, so your code would have to wrap the progressbar.set() call inside a function that you give to add_next_tick_callback (this is most of what my BackgroundTask class does).

@axelande
Copy link
Author

axelande commented May 21, 2021

Wow after some hours spent on this I finally got a small working example 😄

import logging
import threading
import time
import traceback
import tornado.gen

import bokeh_garden.logging_handler
from bokeh.io import curdoc
from bokeh.layouts import row, layout
from bokeh.models.widgets import Button
from bokeh_garden.progress_bar import ProgressBar
from bokeh_garden.application import AppWidget


class BackgroundTask(threading.Thread):
    def __init__(self, app, **kw):
        self.logger = logging.getLogger(
            "%s.%s" % (type(self).__module__, type(self).__name__))
        self.app = app
        self.kw = kw
        threading.Thread.__init__(self)
        for progress_bars in self.app.doc.select(
                {"tags": ["task_progress_bars"]}):
            self.bar = progress_bars

    def run(self):
        bokeh_garden.logging_handler.LoggingHandler.set_current_document(
            self.app.doc)
        kw_log = {}
        for key, value in self.kw.items():
            kw_log[key] = str(value)[:30]
        self.logger.info('Running task : %s,', repr(kw_log))

        self.set_status(0, type(self).__name__)
        try:
            self.result = self.task(**self.kw)
            self.app.save_session()
        except Exception as e:
            self.set_failed(e, traceback.format_exc())
        else:
            self.app.doc.add_next_tick_callback(
                tornado.gen.coroutine(self.done_wrapper))

    def _set_status(self, percent_done, status_text=None):
        self.error_exc = None
        self.error_tb = None
        self.percent_done = percent_done
        self.status_text = status_text

    def _set_failed(self, e, tb):
        self.error_exc = e
        self.error_tb = tb
        self.logger.error('%s:%s', e, tb)
        print(e)
        print(tb)

    def status_wrapper(self):
        self.bar.set(self.percent_done, self.status_text)
        self.status(self.percent_done, self.status_text, **self.kw)

    def fail_wrapper(self):
        self.bar.fail()
        self.fail(self.error_exc, self.error_tb, **self.kw)

    def done_wrapper(self):
        self.logger.info('Task done')
        try:
            self.done(self.result, **self.kw)
        except Exception as e:
            self._set_failed(e, traceback.format_exc())
            self.fail_wrapper()
        else:
            self._set_status(100)
            self.status_wrapper()

    # Public API:
    # Subclasses must override task() and can override the rest of these methods.
    # Task runs in the background, all other in the UI foreground thread.
    def task(self, **kw):
        pass

    def status(self, percent_done, status, **kw):
        pass

    def fail(self, exc, tb, **kw):
        pass

    def done(self, result, **kw):
        pass

    # taks() can call any of these methods to signal its status. Not
    # that set_status() clears any previous failure set by failed().

    def set_status(self, percent_done, status_text=None):
        self._set_status(percent_done, status_text)
        self.app.doc.add_next_tick_callback(
            tornado.gen.coroutine(self.status_wrapper))

    def set_failed(self, e, tb):
        self._set_failed(e, tb)
        self.app.doc.add_next_tick_callback(
            tornado.gen.coroutine(self.fail_wrapper))


def do_subtask(**kwargs):
    time.sleep(2)


class RunSlowCalculation:
    def __init__(self):
        self.button = self._get_button()
        self.progressbar = self._get_progress_bar()
        lay = layout([row(self.button),
                      row(self.progressbar)])
        self.doc = curdoc()
        self.doc.add_root(lay)
        self.bk_task = BackgroundTask(self)
        self.bk_task.task = self.task
        self.bk_task.done = self.done

    def _get_button(self) -> Button:
        button = Button(label='Click to start the progressbar')
        button.on_click(self.start_progressbar)
        return button

    def _get_progress_bar(self):
        progressbar = ProgressBar(AppWidget, tags=['task_progress_bars'])
        return progressbar

    def start_progressbar(self):
        self.bk_task.start()

    def task(self, **kw):
        self.bk_task.set_status(0, 'First')
        do_subtask(**kw)
        self.bk_task.set_status(30, 'Second')
        do_subtask(**kw)
        self.bk_task.set_status(60, 'Third')
        do_subtask(**kw)
        self.bk_task.set_status(99, 'Stuff done')
        return 'result'
    
    def save_session(self):
        print('done')

    def done(self, result, **kwargs):
        self.bk_task = BackgroundTask(self)
        self.bk_task.task = self.task
        self.bk_task.done = self.done


RunSlowCalculation()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants