Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for scheduling global callbacks #2661

Merged
merged 13 commits into from Mar 8, 2022
60 changes: 60 additions & 0 deletions examples/user_guide/Deploy_and_Export.ipynb
Expand Up @@ -432,6 +432,8 @@
" Whether to serve session info on the REST API\n",
" --session-history SESSION_HISTORY\n",
" The length of the session history to record.\n",
" --setup\n",
" Path to a setup script to run before server starts, e.g. to cache data or set up scheduled tasks.\n",
"\n",
"To turn a notebook into a deployable app simply append ``.servable()`` to one or more Panel objects, which will add the app to Bokeh's ``curdoc``, ensuring it can be discovered by Bokeh server on deployment. In this way it is trivial to build dashboards that can be used interactively in a notebook and then seamlessly deployed on Bokeh server.\n",
"\n",
Expand Down Expand Up @@ -547,6 +549,64 @@
"This way we can create a global indicator for the busy state instead of modifying all our callbacks."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Scheduling task with `pn.state.schedule_task`\n",
"\n",
"The `pn.state.schedule_task` functionality allows scheduling global tasks at certain times or on a specific schedule. This is distinct from periodic callbacks, which are scheduled per user session. Global tasks are useful for performing periodic actions like updating cached data, performing cleanup actions or other housekeeping tasks, while periodic callbacks should be reserved for making periodic updates to an application.\n",
"\n",
"The different contexts in which global tasks and periodic callbacks run also has implications on how they should be scheduled. Scheduled task **must not** be declared in the application code itself, i.e. if you are serving `panel serve app.py` the callback you are scheduling must not be declared in the `app.py`. It must be defined in an external module or in a separate script declared as part of the `panel serve` invocation using the `--setup` commandline argument.\n",
"\n",
"Scheduling using `pn.state.schedule_task` is idempotent, i.e. if a callback has already been scheduled under the same name subsequent calls will have no effect. By default the starting time is immediate but may be overridden with the `at` keyword argument. The period may be declared using the `period` argument or a cron expression (which requires the `croniter` library). Note that the `at` time should be in local time but if a callable is provided it must return a UTC time. If `croniter` is installed a `cron` expression can be provided using the `cron` argument.\n",
"\n",
"As a simple example of a task scheduled at a fixed interval:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import datetime as dt\n",
"import asyncio\n",
"\n",
"async def task():\n",
" print(f'Task executed at: {dt.datetime.now()}')\n",
"\n",
"pn.state.schedule_task('task', task, period='1s')\n",
"await asyncio.sleep(3)\n",
"\n",
"pn.state.cancel_task('task')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Note that while both `async` and regular callbacks are supported, asynchronous callbacks are preferred if you are performing any I/O operations to avoid interfering with any running applications.\n",
"\n",
"If you have the `croniter` library installed you may also provide a cron expression, e.g. the following will schedule a task to be repeated at 4:02 am every Monday and Friday:"
philippjfr marked this conversation as resolved.
Show resolved Hide resolved
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"pn.state.schedule_task('task', task, cron='2 4 * * mon,fri')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"See [crontab.guru](https://crontab.guru/) and the [`croniter` README](https://github.com/kiorky/croniter#introduction) to learn about cron expressions genrally and special syntax supported by `croniter`."
]
},
{
"cell_type": "markdown",
"metadata": {},
Expand Down
2 changes: 2 additions & 0 deletions examples/user_guide/Overview.ipynb
Expand Up @@ -221,8 +221,10 @@
"> #### Methods\n",
"> \n",
"> - `as_cached`: Allows caching data across sessions by memoizing on the provided key and keyword arguments to the provided function.\n",
"> - `cancel_scheduled`: Cancel a scheduled task by name.\n",
philippjfr marked this conversation as resolved.
Show resolved Hide resolved
"> - `add_periodic_callback`: Schedules a periodic callback to be run at an interval set by the period\n",
"> - `kill_all_servers`: Stops all running server sessions.\n",
"> - `schedule`: Schedule a callback periodically at a specific time (click [here](./Deploy_and_Export.ipynb#pn.state.schedule) for more details)\n",
philippjfr marked this conversation as resolved.
Show resolved Hide resolved
"> - `onload`: Allows defining a callback which is run when a server is fully loaded\n",
"> - `sync_busy`: Sync an indicator with a boolean value parameter to the busy property on state"
]
Expand Down
2 changes: 1 addition & 1 deletion examples/user_guide/Performance_and_Debugging.ipynb
Expand Up @@ -43,7 +43,7 @@
"data = pn.state.as_cached('data', load_data, *args, **kwargs)\n",
"```\n",
"\n",
"The first time the app is loaded the data will be cached and subsequent sessions will simply look up the data in the cache, speeding up the process of rendering. If you want to warm up the cache before the first user visits the application you can also provide the `--warm` argument to the `panel serve` command, which will ensure the application is initialized once on launch."
"The first time the app is loaded the data will be cached and subsequent sessions will simply look up the data in the cache, speeding up the process of rendering. If you want to warm up the cache before the first user visits the application you can also provide the `--warm` argument to the `panel serve` command, which will ensure the application is initialized as soon as it is launched. If you want to populate the cache in a separate script from your main application you may also provide the path to a setup script using the `--setup` argument to `panel serve`. If you want to periodically update the cache look into the ability to [schedule tasks](Deploy_and_Export.ipynb#Scheduling-task-with-pn.state.schedule_task)."
]
},
{
Expand Down
19 changes: 19 additions & 0 deletions panel/command/serve.py
Expand Up @@ -9,6 +9,7 @@
import os

from glob import glob
from types import ModuleType

from bokeh.command.subcommands.serve import Serve as _BkServe
from bokeh.command.util import build_single_handler_applications
Expand Down Expand Up @@ -185,6 +186,12 @@ class Serve(_BkServe):
type = int,
help = "Whether to start a thread pool which events are dispatched to.",
default = None
)),
('--setup', dict(
action = 'store',
type = str,
help = "Path to a setup script to run before server starts.",
default = None
))
)

Expand Down Expand Up @@ -252,6 +259,18 @@ def customize_kwargs(self, args, server_kwargs):
for f in files:
watch(f)

if args.setup:
setup_path = args.setup
with open(setup_path) as f:
setup_source = f.read()
nodes = ast.parse(setup_source, os.fspath(setup_path))
code = compile(nodes, filename=setup_path, mode='exec', dont_inherit=True)
module_name = 'panel_setup_module'
module = ModuleType(module_name)
module.__dict__['__file__'] = os.path.abspath(setup_path)
exec(code, module.__dict__)
state._setup_module = module

if args.warm or args.autoreload:
argvs = {f: args.args for f in files}
applications = build_single_handler_applications(files, argvs)
Expand Down
153 changes: 151 additions & 2 deletions panel/io/state.py
Expand Up @@ -2,24 +2,28 @@
Various utilities for recording and embedding state in a rendered app.
"""
import datetime as dt
import inspect
import logging
import json
import threading
import time

from collections.abc import Iterator
from collections import OrderedDict, defaultdict
from contextlib import contextmanager
from weakref import WeakKeyDictionary
from functools import partial
from urllib.parse import urljoin
from weakref import WeakKeyDictionary

import param

from bokeh.document import Document
from bokeh.io import curdoc as _curdoc
from pyviz_comms import CommManager as _CommManager
from tornado.ioloop import IOLoop
from tornado.web import decode_signed_value

from ..util import base64url_decode
from ..util import base64url_decode, parse_timedelta
from .logging import LOG_SESSION_RENDERED, LOG_USER_MSG

_state_logger = logging.getLogger('panel.state')
Expand Down Expand Up @@ -112,6 +116,12 @@ class _state(param.Parameterized):
_onload = WeakKeyDictionary()
_on_session_created = []

# Module that was run during setup
_setup_module = None

# Scheduled callbacks
_scheduled = {}

# Indicators listening to the busy state
_indicators = []

Expand Down Expand Up @@ -222,6 +232,24 @@ def _on_load(self, doc=None):
self._profiles[(path+':on_load', config.profiler)] += sessions
self.param.trigger('_profiles')

async def _scheduled_cb(self, name):
if name not in self._scheduled:
return
diter, cb = self._scheduled[name]
try:
at = next(diter)
except Exception:
at = None
del self._scheduled[name]
if at is not None:
ioloop = IOLoop.current()
now = dt.datetime.now().timestamp()
call_time_seconds = (at - now)
ioloop.call_later(delay=call_time_seconds, callback=partial(self._scheduled_cb, name))
res = cb()
if inspect.isawaitable(res):
await res

#----------------------------------------------------------------
# Public Methods
#----------------------------------------------------------------
Expand Down Expand Up @@ -309,6 +337,24 @@ def add_periodic_callback(self, callback, period=500, count=None,
cb.start()
return cb

def cancel_task(self, name, wait=False):
"""
Cancel a task scheduled using the `state.schedule_task` method by name.

Arguments
---------
name: str
The name of the scheduled task.
wait: boolean
Whether to wait until after the next execution.
"""
if name not in self._scheduled:
raise KeyError(f'No task with the name {name!r} has been scheduled.')
if wait:
self._scheduled[name] = (None, self._scheduled[name][1])
else:
del self._scheduled[name]

def get_profile(self, profile):
"""
Returns the requested profiling output.
Expand Down Expand Up @@ -423,6 +469,109 @@ def publish(self, endpoint, parameterized, parameters=None):
self._rest_endpoints[endpoint] = ([parameterized], parameters, cb)
parameterized.param.watch(cb, parameters)

def schedule_task(self, name, callback, at=None, period=None, cron=None):
"""
Schedules a task at a specific time or on a schedule.

By default the starting time is immediate but may be
overridden with the `at` keyword argument. The scheduling may
be declared using the `period` argument or a cron expression
(which requires the `croniter` library). Note that the `at`
time should be in local time but if a callable is provided it
must return a UTC time.

Note that the scheduled callback must not be defined within a
script served using `panel serve` because the script and all
its contents are cleaned up when the user session is
destroyed. Therefore the callback must be imported from a
separate module or should be scheduled from a setup script
(provided to `panel serve` using the `--setup` argument). Note
also that scheduling is idempotent, i.e. if a callback has
already been scheduled under the same name subsequent calls
will have no effect. This is ensured that even if you schedule
a task from within your application code, the task is only
scheduled once.

Arguments
---------
name: str
Name of the scheduled task
callback: callable
Callback to schedule
at: datetime.datetime, Iterator or callable
Declares a time to schedule the task at. May be given as a
datetime or an Iterator of datetimes in the local timezone
declaring when to execute the task. Alternatively may also
declare a callable which is given the current UTC time and
must return a datetime also in UTC.
period: str or datetime.timedelta
The period between executions, may be expressed as a
timedelta or a string:

- Week: '1w'
- Day: '1d'
- Hour: '1h'
- Minute: '1m'
- Second: '1s'

cron: str
A cron expression (requires croniter to parse)
"""
if name in self._scheduled:
if callback is not self._scheduled[name][1]:
self.param.warning(
"A separate task was already scheduled under the "
f"name {name!r}. The new task will be ignored. If "
"you want to replace the existing task cancel it "
"with `state.cancel_task({name!r})` before adding "
"adding a new task under the same name."
)
return
ioloop = IOLoop.current()
if getattr(callback, '__module__', '').startswith('bokeh_app_'):
raise RuntimeError(
"Cannot schedule a task from within the context of an "
"application. Either import the task callback from a "
"separate module or schedule the task from a setup "
"script that you provide to `panel serve` using the "
"--setup commandline argument."
)
if cron is None:
if isinstance(period, str):
period = parse_timedelta(period)
def dgen():
if isinstance(at, Iterator):
while True:
new = next(at)
yield new.timestamp()
elif callable(at):
while True:
new = at(dt.datetime.utcnow())
if new is None:
raise StopIteration
yield new.replace(tzinfo=dt.timezone.utc).astimezone().timestamp()
elif period is None:
yield at.timestamp()
raise StopIteration
new_time = at or dt.datetime.now()
while True:
yield new_time.timestamp()
new_time += period
diter = dgen()
else:
from croniter import croniter
base = dt.datetime.now() if at is None else at
diter = croniter(cron, base)
now = dt.datetime.now().timestamp()
try:
call_time_seconds = (next(diter) - now)
except StopIteration:
return
self._scheduled[name] = (diter, callback)
ioloop.call_later(
delay=call_time_seconds, callback=partial(self._scheduled_cb, name)
)

def sync_busy(self, indicator):
"""
Syncs the busy state with an indicator with a boolean value
Expand Down
1 change: 1 addition & 0 deletions panel/tests/conftest.py
Expand Up @@ -181,6 +181,7 @@ def server_cleanup():
state._locations.clear()
state._curdoc = None
state.cache.clear()
state._scheduled.clear()
if state._thread_pool is not None:
state._thread_pool.shutdown(wait=False)
state._thread_pool = None
Expand Down