Skip to content

Commit

Permalink
Add asyncio helper for resolving task results asynchronously.
Browse files Browse the repository at this point in the history
Also add example script showing how to use this.
  • Loading branch information
coleifer committed Jun 23, 2023
1 parent 4737bdf commit 50f5a67
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 0 deletions.
31 changes: 31 additions & 0 deletions examples/simple/amain.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""
Example script showing how you can use asyncio to read results.
"""
import asyncio
import time

from huey.contrib.asyncio import aget_result
from huey.contrib.asyncio import aget_result_group

from tasks import *


async def main():
s = time.time()
r1, r2, r3 = [slow(2) for _ in range(3)]
results = await asyncio.gather(
aget_result(r1),
aget_result(r2),
aget_result(r3))
print(results)
print(round(time.time() - s, 2))

# Using result group.
s = time.time()
results = await aget_result_group(slow.map([2, 2, 2]))
print(results)
print(round(time.time() - s, 2))


if __name__ == '__main__':
asyncio.run(main())
52 changes: 52 additions & 0 deletions huey/contrib/asyncio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import asyncio

from huey.constants import EmptyData


async def aget_result(res, backoff=1.15, max_delay=1.0, preserve=False):
"""
Await a task result.
Example usage:
@huey.task()
def some_task(...):
...
# Call the task and get the normal result-handle.
rh = some_task(...)
# Asynchronously await the result of the task.
result = await aget_result(rh)
More advanced example of waiting for multiple results concurrently:
r1 = some_task(...)
r2 = some_task(...)
r3 = some_task(...)
# Asynchronously await the results of all 3 tasks.
results = await asyncio.gather(
aget_result(r1),
aget_result(r2),
aget_result(r3))
NOTE: the Redis operation will be a normal blocking socket read, but in
practice these will be super fast. The slow part is the necessity to call
`sleep()` between polling intervals (since the Redis command to read the
result does not block).
"""
delay = 0.1
while res._result is EmptyData:
delay = min(delay, max_delay)
if res._get(preserve) is EmptyData:
await asyncio.sleep(delay)
delay *= backoff
return res._result


async def aget_result_group(rg, *args, **kwargs):
return await asyncio.gather(*[
aget_result(r, *args, **kwargs)
for r in rg])

0 comments on commit 50f5a67

Please sign in to comment.