In [None]:
from __future__ import annotations

import asyncio
import time

from agently_stage import Stage


async def async_task(input_sentence: str):
    print("Start Simulate Async Network Request...")
    await asyncio.sleep(1)
    print("Response Return...")
    return f"Network Request Data: Your input is {input_sentence}"


def sync_task(a: int, b: int):
    print("Start Simulate Sync Long Time Task...")
    time.sleep(2)
    print("Task Done...")
    return f"Task Result: {a + b}"


stage = Stage()
async_response = stage.go(async_task, "Agently Stage is awesome!")
sync_response = stage.go(sync_task, 1, 2)
stage.close()
# Try remove this line below, it'll work perfectly too.
print(async_response.get(), "|", sync_response.get())

Start Simulate Sync Long Time Task...
Start Simulate Async Network Request...
Response Return...


KeyboardInterrupt: 

Task Done...


In [None]:
from agently_stage import Stage

with Stage() as stage:

    async def async_task(input_sentence: str):
        print("Start Simulate Async Network Request...")
        await asyncio.sleep(1)
        print("Response Return...")
        return f"Network Request Data: Your input is {input_sentence}"

    def sync_task(a: int, b: int):
        print("Start Simulate Sync Long Time Task...")
        time.sleep(2)
        print("Task Done...")
        return f"Task Result: {a + b}"

    async_response = stage.go(async_task, "Agently Stage is awesome!")
    sync_response = stage.go(sync_task, 1, 2)

print(async_response.get(), "|", sync_response.get())

Start Simulate Sync Long Time Task...
Start Simulate Async Network Request...
Response Return...
Task Done...
Network Request Data: Your input is Agently Stage is awesome! | Task Result: 3


In [None]:
from agently_stage import Stage

stage = Stage()


@stage.func
def task(sentence: str):
    time.sleep(1)
    return f"Done: {sentence}"


# Defined but hasn't run
task

# Start running
task.go("First")
# or just `task()` like call a function normally

# Block current thread and wait until done to get result
result = task.wait()
print(result)

# Wait again won't restart it
result_2 = task.wait()
print(result_2)

# Reset make the function can be started again
task.reset()
task("Second")
result_3 = task.wait()
print(result_3)

Done: First
Done: First
Done: Second


In [None]:
from agently_stage import Stage

# We create a handler in one dispatch
with Stage() as stage_1:

    @stage_1.func
    async def handler(sentence):
        return f"Someone said: {sentence}"


# We wait this handler in another dispatch
with Stage() as stage_2:

    def waiting():
        result = handler.wait()
        print(result)

    stage_2.go(waiting)
    # Some uncertain time later, the handler is called
    time.sleep(1)

    async def executor():
        await asyncio.sleep(1)
        handler("StageFunction is useful!")

    stage_2.go(executor)

Someone said: StageFunction is useful!


In [None]:
from agently_stage import Stage

with Stage() as stage:

    async def start_gen(n: int):
        for i in range(n):
            await asyncio.sleep(1)
            yield i + 1

    gen = stage.go(start_gen, 5)
    for item in gen:
        print(item)

1
2
3
4
5


In [None]:
from agently_stage import Stage

with Stage() as stage:

    async def start_gen(n: int):
        for i in range(n):
            await asyncio.sleep(1)
            yield i + 1

    gen = stage.go(start_gen, 5)
    result_list = gen.get()
    print(result_list)

[1, 2, 3, 4, 5]


In [None]:
import time

from agently_stage import Stage

with Stage() as stage:

    async def start_gen(n: int):
        for i in range(n):
            await asyncio.sleep(1)
            yield i + 1
        raise Exception("Some Error")

    async def times_item(item: int):
        await asyncio.sleep(0.1)
        return item * 2

    gen = stage.go(
        start_gen,
        5,
        lazy=True,  # <- Set generator as lazy mode
        # Define runtime handler to consume origin yielded value and re-yield the final value
        on_success=times_item,
        # Define runtime handler to consume raised exception and re-yield the final value or exception
        on_error=lambda e: str(e),
    )
    time.sleep(5)
    # Generator function `start_gen` start here only when `gen` is iterated
    for item in gen:
        print(item)
    print(gen.get())

2
4
6
8
10
Some Error
[2, 4, 6, 8, 10, 'Some Error']


In [None]:
import time

from agently_stage import Stage, Tunnel

tunnel = Tunnel()

with Stage() as stage:

    def consumer():
        print("GO CONSUMER")
        time.sleep(1)
        # You can use `tunnel.get_gen()` to get a `StageHybridGenerator` from tunnel instance
        gen = tunnel.get_generator()
        for data in gen:
            print("streaming:", data)

    async def async_consumer():
        print("GO A CONSUMER")
        # Or you can just iterate over tunnel data by `for`/`async for`
        async for data in tunnel:
            print("async streaming:", data)

    async def provider(n: int):
        print("GO PROVIDER")
        for i in range(n):
            tunnel.put(object)
            await asyncio.sleep(0.1)
        # If you forget to .put_stop(), tunnel will close after 10s by default
        tunnel.put_stop()

    # State consumer first
    stage.go(consumer)
    stage.go(async_consumer)
    # Provider start providing data sometime later
    time.sleep(1)
    stage.get(provider, 5)

# You can also use `tunnel.get()` to get a final yielded item list
print(tunnel.get()[0])

GO CONSUMER
GO A CONSUMER
GO PROVIDER
async streaming: <object object at 0x1035b25a0>
async streaming: <object object at 0x1035b2590>
async streaming: <object object at 0x1035b26e0>
async streaming: <object object at 0x1035b25d0>
async streaming: <object object at 0x1035b2630>
streaming: <object object at 0x1035b25a0>
streaming: <object object at 0x1035b2590>
streaming: <object object at 0x1035b26e0>
streaming: <object object at 0x1035b25d0>
streaming: <object object at 0x1035b2630>
<object object at 0x1035b25a0>


In [None]:
from agently_stage import EventEmitter, Stage

emitter = EventEmitter()


async def listener(data):
    print(f"I got: {data}")
    # You can return value to emitter
    return True


emitter.on("data", listener)

with Stage() as stage:
    # Submit task that wait to run later
    stage.go(lambda: emitter.emit("data", "EventEmitter is Cool!"))

responses = emitter.emit("data", "I'll say it again, EventEmitter is Cool!")

# Get responses from all event listeners
for response in responses:
    print(response.get())

I got: I'll say it again, EventEmitter is Cool!
True


I got: EventEmitter is Cool!


In [None]:
# Stress Testing
import random
import threading
import time

from agently_stage import Stage

counter = {}
counter_lock = threading.Lock()
stage = Stage(max_workers=None)  # <-Modify max workers of thread pool executor here
test_times = 1000
# Each time submit 1 sync task, 1 async task,
# 1 sync callback task and 1 async callback


# Simulation Task
def sample_task(task_id):
    """Simulate sync task with random delay"""
    # print(f"Sync Task {task_id} started")
    time.sleep(random.uniform(0.1, 0.5))
    with counter_lock:
        counter[task_id] = True
        done_count = sum(counter.values())
    print(f"Sync Task {task_id} completed", "Process:", done_count, "/", len(counter.keys()))


async def sample_async_task(task_id):
    """Simulate async task with random delay"""
    # print(f"Async Task {task_id} started")
    await asyncio.sleep(random.uniform(0.1, 0.5))
    with counter_lock:
        counter[task_id] = True
        done_count = sum(counter.values())
    print(f"Async Task {task_id} completed", "Process:", done_count, "/", len(counter.keys()))
    return task_id


# Callback
def on_success(task_id):
    # print(f"{task_id} Sync Callback")
    with counter_lock:
        done_count = sum(counter.values())
    if done_count == test_times * 2:
        print_finish_time()


async def async_on_success(task_id):
    # print(f"{task_id} Async Callback")
    with counter_lock:
        done_count = sum(counter.values())
    if done_count == test_times * 2:
        print_finish_time()


# Start time
start_time = time.time()
print("Start:", start_time)


# Finish Time
def print_finish_time():
    finish_time = time.time()
    print("End:", finish_time, "Use:", finish_time - start_time)


# Submit Tasks
for i in range(test_times):
    stage.go(sample_task, str(i), on_success=async_on_success)
    counter.update({str(i): False})
    stage.go(sample_async_task, "a" + str(i), on_success=on_success)
    counter.update({("a" + str(i)): False})

Start: 1740360784.834315


Sync Task 0 completedSync Task 14 completed Process: 2 / 2000
 Process: 1 / 2000
Async Task a67 completed Process: 3 / 2000
Sync Task 18 completed Process: 4 / 2000
Async Task a71 completed Process: 5 / 2000
Sync Task 16 completed Process: 6 / 2000
Async Task a15 completed Process: 7 / 2000
Async Task a83 completed Process: 8 / 2000
Sync Task 9 completed Process: 9 / 2000
Async Task a32 completed Process: 10 / 2000
Async Task a60 completed Process: 11 / 2000
Async Task a45 completed Process: 12 / 2000
Async Task a22 completed Process: 13 / 2000
Async Task a151 completed Process: 14 / 2000
Async Task a27 completed Process: 15 / 2000
Async Task a120 completed Process: 16 / 2000
Async Task a535 completed Process: 17 / 2000
Async Task a422 completed Process: 18 / 2000
Async Task a530 completed Process: 19 / 2000
Async Task a366 completed Process: 20 / 2000
Async Task a104 completed Process: 21 / 2000
Async Task a720 completed Process: 22 / 2000
Async Task a883 completed Process: 23 / 2000
