In [1]:
# Imports for the example
from datetime import datetime
import time

# Imports

In [2]:
from taskblocks import TaskBlock, AsyncEvent, AsyncQueue

# Create a TaskBlock

In [3]:
class ExampleTaskBlock(TaskBlock):

    def construct_io(self) -> None:
        """Constructs the I/O for this TaskBlock."""
        # Inputs
        self.inputs.queues["main_input"] = AsyncQueue()

        # Outputs
        self.outputs.events["setup_check"] = AsyncEvent()
        self.outputs.events["teardown_check"] = AsyncEvent()

        self.outputs.queues["main_output"] = AsyncQueue()

    def setup(self) -> None:
        """This method runs before the task loop only once."""
        # Set an event when this TaskBlock starts
        self.outputs.events["setup_check"].set()

    async def task(self) -> None:
        """This method will run either once or indefinitely depending on if "running" or "starting" the TaskBlock"""
        # Try and get an item from the queue.
        try:
            in_item = await self.inputs.queues["main_input"].get_async()

            if in_item == 0:
                self.loop_event.clear()
                return
        except InterruptedError:
            return  # If interrupted, skip this loop of the task.

        print(f"{datetime.now().strftime('%H:%M:%S.%f')}: Received {in_item} in task")

        # Do something to transform the input item to make an output item.
        out_item = in_item + 1

        # Wait to simulate a long compute time
        time.sleep(2)

        # Put the resulting item on the queue.
        await self.outputs.queues["main_output"].put_async(out_item)

    def teardown(self) -> None:
        """This method runs after the task loop only once."""
        # Set an event when this TaskBlock end
        self.outputs.events["teardown_check"].set()

    def stop(self) -> None:
        """Stops the task."""
        super().stop()  # Turn off the task loop.
        self.inputs.queues["main_input"].get_interrupt.set()  # Interrupt the task so it can escape out.

In [4]:
# Create TaskBlock object
task = [None] * 4
input_q = AsyncQueue()
output_q = AsyncQueue()
for i in range(4):
    task[i] = t = ExampleTaskBlock(is_process=True)
    t.inputs.queues["main_input"] = input_q
    t.outputs.queues["main_output"] = output_q

# Start TaskBlock in separate process and print when setup is complete
print(f"{datetime.now().strftime('%H:%M:%S.%f')}: Setup event is {task.outputs.events['setup_check'].is_set()}")

task.start()  # Will continuously execute the task until its stop method is called from any process.
task.outputs.events["setup_check"].wait()

print(f"{datetime.now().strftime('%H:%M:%S.%f')}: Setup event is {task.outputs.events['setup_check'].is_set()}\n")


# Put some data on the queue and let the TaskBlock process it
print(f"{datetime.now().strftime('%H:%M:%S.%f')}: Putting items on the queue")

task.inputs.queues["main_input"].put(1)
task.inputs.queues["main_input"].put(2)

print(f"{datetime.now().strftime('%H:%M:%S.%f')}: Items on the queue")
print(f"{datetime.now().strftime('%H:%M:%S.%f')}: Waiting for results")
out = task.outputs.queues["main_output"].get()
print(f"{datetime.now().strftime('%H:%M:%S.%f')}: {out} was returned from the TaskBlock")
out = task.outputs.queues["main_output"].get()
print(f"{datetime.now().strftime('%H:%M:%S.%f')}: {out} was returned from the TaskBlock\n")


# Tells the TaskBlock to stop and print when teardown is complete
print(f"{datetime.now().strftime('%H:%M:%S.%f')}: Teardown event is {task.outputs.events['teardown_check'].is_set()}")

task.stop()
task.outputs.events["teardown_check"].wait()

print(f"{datetime.now().strftime('%H:%M:%S.%f')}: Teardown event is {task.outputs.events['teardown_check'].is_set()}\n")


11:02:20.699423: Setup event is False
11:02:20.743160: Received 1 in task
11:02:20.740905: Setup event is True

11:02:20.741170: Putting items on the queue
11:02:20.743187: Items on the queue
11:02:20.743288: Waiting for results
11:02:22.749402: Received 2 in task
11:02:22.751237: 2 was returned from the TaskBlock
11:02:24.771335: 3 was returned from the TaskBlock

11:02:24.771459: Teardown event is False
11:02:24.771985: Teardown event is True

