A lightweight helper for bridging sync and async code, built on Asyncio & ThreadPoolExecutor.
Asyncio is powerful, but it has a cascading problem: the moment you want to
await something, that function has to be async, which means everything
calling it has to be async, and so on until your entire codebase has been
rewritten around it. If you're adding parallelism to an existing project, or
just want to speed up one specific bottleneck, that cost is hard to justify.
Dovetail's advantage isn't raw performance — asyncio.gather will always be
at least as fast, since Dovetail is built on top of it. The advantage of Dovetail is its
surgical approach: you can parallelise one function in an otherwise
synchronous codebase without touching anything else.
# Existing sync code, completely unchanged around this one call
def process_report():
data = dvt.task.map_blocking(fetch_url, urls) # parallel, but sync caller
return summarise(data)With raw asyncio you can't do this — fetch_url being async forces
process_report to be async, which forces everything above it to be async.
Dovetail breaks that chain. If you're starting a greenfield project and are
comfortable with async/await throughout, you probably don't need it. Dovetail
is for the developers who are stuck between "I want parallelism" and "I don't want to rewrite everything."
pip install pydovetailNote: The install name is
pydovetail, but the import name isdovetail.from dovetail import Dovetail
Preferred usage: create and manage a Dovetail instance with a context manager.
Sync (recommended for most users):
from dovetail import Dovetail
with Dovetail(max_workers=8) as dvt:
# Sync caller -> sync function (blocks)
result = dvt.task.run_blocking(fetch_data)
# Sync caller -> sync function in threadpool (blocks)
result = dvt.task.to_thread_blocking(fetch_data, "target.json")
# Sync caller -> batch map in threadpool (blocks, bounded concurrency)
results = dvt.task.map_blocking(fetch_data, ["a.json", "b.json"], max_concurrency=4)
# run an async coroutine from sync code
value = dvt.task.run_blocking(fetch_data_async())Async (use async with in async applications):
async with Dovetail() as dvt:
# schedule coroutines and await
tasks = [dvt.task.schedule(fetch_async(i)) for i in items]
results = await asyncio.gather(*tasks)Manual / library patterns (caller-managed ownership):
# Manual creation: by default Dovetail will attempt to shutdown on
# interpreter exit and on SIGINT/SIGTERM. You can opt out by passing
# `shutdown_on_exit=False` when constructing the instance.
dvt = Dovetail() # shutdown_on_exit enabled by default
try:
results = dvt.task.map_blocking(fetch, items)
finally:
# explicit shutdown is still allowed and idempotent
dvt.shutdown()
# Library-author pattern (prefer caller-managed instance when possible)
def process(items, dvt: Optional[Dovetail] = None):
if dvt is None:
with Dovetail() as _dvt:
return _dvt.task.map_blocking(worker, items)
return dvt.task.map_blocking(worker, items)Note:
Dovetailwill, by default, register a best-effort shutdown handler that callsshutdown()on interpreter exit and whenSIGINTorSIGTERMare received. This is opt-outable viashutdown_on_exit=False. Explicitly callingdvt.shutdown()is still supported and idempotent.Sync context manager (recommended for synchronous code):
with Dovetail(max_workers=8) as dvt: results = dvt.task.map_blocking(fetch, items) # threadpool shut down on block exitAsync context manager (recommended for async code):
async def main(): async with Dovetail() as dvt: tasks = [dvt.task.schedule(coro(i)) for i in items] await asyncio.gather(*tasks) # __aexit__ runs shutdown off the event loop to avoid blockingImplementation note:
Dovetailsupports both__enter__/__exit__and__aenter__/__aexit__. The async exit runsshutdown()in a background executor to avoid blocking the running loop.shutdown()is idempotent and safe to call multiple times.
Task execution
dvt.task.to_thread(func, *args, **kwargs)— await a sync callable in Dovetail's threadpool from async code; supports default timeout/retry settings.dvt.task.to_thread_blocking(func, *args, **kwargs)— synchronous wrapper forto_threadwhen you are not already in an event loop.dvt.task.schedule(coro_or_callable, *args, **kwargs)— create and return anasyncio.Taskfrom a coroutine object, async function, or sync callable.dvt.task.run_blocking(func_or_coro, *args, **kwargs)— run async or sync work from synchronous code. Do not call from inside a running event loop.dvt.task.map_blocking(func, items, max_concurrency=None, return_exceptions=False)— run an ordered parallel map overitemsusing the threadpool.
Events
dvt.events.on_queued(...)— fires when a task is queued.dvt.events.on_start(...)— fires when a task starts executing.dvt.events.on_end(...)— fires when a task resolves (including after retries).dvt.events.on_error(...)— fires when a task raises an exception.dvt.events.on_retry(...)— fires on each retry attempt.dvt.events.on_cancel(...)— fires when a task is cancelled.dvt.events.stats()— returns cumulative counters:queued,started,done,error,retries,throttled.
Registry (auto-registration)
dovetailnow exposes a lightweight registry to track activeDovetailinstances. By defaultDovetail(..., auto_register=True)will register itself with the package registry so applications can perform coordinated shutdown.- Public helpers:
from dovetail import register, unregister, list_active, shutdown_all, set_app_shutdown_hookregister(dvt)— explicitly register an instance (optional; instances auto-register by default).unregister(dvt)— remove from the registry (useful if you manage shutdown yourself).list_active()— return a list of active (live) instances.shutdown_all(wait=True)— best-effort shutdown of all registered instances (useful for app shutdown hooks).set_app_shutdown_hook(fn)— provide an application-level hook that the registry will call during process exit.
Notes:
- The registry uses weakrefs and logs a warning if a
Dovetailis garbage-collected without an explicitshutdown()call. - To opt-out of auto-registration when creating a
Dovetail, passauto_register=Falseto the constructor.
All dvt.events.on_* methods accept these optional scope parameters:
function_target— only receive events for a specific callable.instance_target— only receive events for a specific execution id (available asidin the event payload).allow_reentry=False— prevents a listener from triggering itself recursively. Set toTrueto allow reentry, bounded bymax_chain_depth.max_chain_depth=5— maximum active callback depth when reentry is enabled.
Full, expanded guides live in the repository under docs/detailed_guide.md.
Run tests locally:
python -m pytest -qContributions welcome — open an issue or PR.
MIT. See CHANGELOG.md for release notes.