In [1]:
# 线程池测试

from workflow.threading_pool import ThreadPool
import time

def example_task(x):
    time.sleep(2)  # 模拟耗时操作
    return x * x

# 初始化线程池
pool = ThreadPool(max_workers=4)

# 提交任务
futures = [pool.submit(example_task, i) for i in range(10)]

# 获取结果
for i, future in enumerate(futures):
    print(f"Task {i} result: {future.result()}")

# 关闭线程池
pool.shutdown()



In [1]:
# 变量池测试

from workflow.graph_engine.entities.variable_pool import VariablePool
from workflow.graph_engine.entities.variable import Variable

vp = VariablePool()
vp.add(["开始", "query"], Variable(name="query", value="Hello", selector=["开始", "query"]))

print(vp.get(["开始", "query"]).value)

Hello


In [1]:
from workflow.graph_engine.entities.workflow_graph import WorkflowGraph
from workflow.nodes.node_mapping import NodeType
from workflow.graph_engine.graph_engine import WorkflowRunner
from workflow.graph_engine.entities.variable_pool import VariablePool
from workflow.graph_engine.entities.variable import Variable
from concurrent.futures import ThreadPoolExecutor

graph = WorkflowGraph([])
graph.add_node("0", NodeType.START, {})
graph.add_node("1", NodeType.LLM, {})
graph.add_node("2", NodeType.LLM, {})
graph.add_node("3", NodeType.LLM, {})
graph.add_node("4", NodeType.END, {})
graph.add_edge("0", "1")
graph.add_edge("0", "2")
graph.add_edge("1", "3")
graph.add_edge("2", "4")
graph.add_edge("3", "4")

engine = WorkflowRunner(graph, ThreadPoolExecutor(max_workers=4))

engine.variable_pool.add(["1", "prompt"], Variable(name="prompt", value="Hello, who are you", selector=["1", "prompt"]))
engine.variable_pool.add(["2", "prompt"], Variable(name="prompt", value="Hello there", selector=["2", "prompt"]))
engine.variable_pool.add(["3", "prompt"], Variable(name="prompt", value="How are you", selector=["2", "prompt"]))
engine.run("0")


0->[]
Node 0 is running...
Node 0 finished in 1.0012121200561523 seconds.
1->[True]
2->[True]
Node 1 is running...
Node 2 is running...


Hello 👋! I'm here to help you with any questions or tasks you have. How can I assist you today?
Node 2 finished in 2.1314477920532227 seconds.
4->[True, False]
Hello, I am ChatGLM, a language model jointly trained by KEG Lab of Tsinghua University and Zhipu.AI Company. My job is to provide appropriate answers and support to users' questions and requests.
Node 1 finished in 2.5852205753326416 seconds.
3->[True]
Node 3 is running...
I'm fine, thank you! How can I assist you today?
Node 3 finished in 2.1023967266082764 seconds.
4->[True, True]
Node 4 is running...
Node 4 finished in 1.000960350036621 seconds.


In [2]:
engine.node_states

{'0': <NodeRunStatus.SUCCEEDED: 'SUCCEEDED'>}

In [5]:
print(engine.variable_pool.get(["1", "output"]))

Hello! I am ChatGLM, an artificial intelligence assistant based on the GLM model, which is jointly trained by Knowledge Engineering Group, Tsinghua University and Zhipu.AI Company. My job is to provide appropriate answers and support to users' questions and requests.


In [1]:
# 事件队列测试

import threading
import queue
import time
from typing import Any, Callable


# 1. 事件定义
class Event:
    """基础事件类"""
    def __init__(self, event_type: str, data: Any = None):
        self.event_type = event_type
        self.data = data

    def __repr__(self):
        return f"Event(type={self.event_type}, data={self.data})"


# 2. 事件队列
class EventQueue:
    """事件队列"""
    def __init__(self):
        self.queue = queue.Queue()  # 线程安全队列

    def publish(self, event: Event):
        """将事件放入队列"""
        self.queue.put(event)

    def consume(self) -> Event:
        """从队列中取出事件"""
        return self.queue.get()


# 3. 事件处理器
class EventHandler:
    """事件处理器"""
    def __init__(self):
        self.handlers = {}  # 存储事件类型与处理函数的映射

    def register_handler(self, event_type: str, handler: Callable[[Event], None]):
        """注册事件处理函数"""
        self.handlers[event_type] = handler

    def handle_event(self, event: Event):
        """处理事件"""
        handler = self.handlers.get(event.event_type)
        if handler:
            handler(event)
        else:
            print(f"No handler for event type: {event.event_type}")


# 4. 事件生产者与消费者
def producer(event_queue: EventQueue, event_types: list[str], interval: float):
    """事件生产者"""
    for i in range(10):
        event_type = event_types[i % len(event_types)]
        event = Event(event_type=event_type, data={"count": i})
        print(f"[Producer] Generated: {event}")
        event_queue.publish(event)
        time.sleep(interval)


def consumer(event_queue: EventQueue, event_handler: EventHandler):
    """事件消费者"""
    while True:
        event = event_queue.consume()
        print(f"[Consumer] Processing: {event}")
        event_handler.handle_event(event)


# 主程序
if __name__ == "__main__":
    # 创建事件队列和处理器
    event_queue = EventQueue()
    event_handler = EventHandler()

    # 注册事件处理函数
    def handle_event_type_a(event: Event):
        print(f"Handled Event A with data: {event.data}")

    def handle_event_type_b(event: Event):
        print(f"Handled Event B with data: {event.data}")

    event_handler.register_handler("EventA", handle_event_type_a)
    event_handler.register_handler("EventB", handle_event_type_b)

    # 创建生产者线程
    producer_thread = threading.Thread(
        target=producer, 
        args=(event_queue, ["EventA", "EventB"], 1),
        daemon=True
    )

    # 创建消费者线程
    consumer_thread = threading.Thread(
        target=consumer,
        args=(event_queue, event_handler),
        daemon=True
    )

    # 启动线程
    producer_thread.start()
    consumer_thread.start()

    # 主线程等待一段时间后退出
    time.sleep(12)
    print("Main thread exiting...")


[Producer] Generated: Event(type=EventA, data={'count': 0})
[Consumer] Processing: Event(type=EventA, data={'count': 0})
Handled Event A with data: {'count': 0}
[Producer] Generated: Event(type=EventB, data={'count': 1})
[Consumer] Processing: Event(type=EventB, data={'count': 1})
Handled Event B with data: {'count': 1}
[Producer] Generated: Event(type=EventA, data={'count': 2})
[Consumer] Processing: Event(type=EventA, data={'count': 2})
Handled Event A with data: {'count': 2}
[Producer] Generated: Event(type=EventB, data={'count': 3})
[Consumer] Processing: Event(type=EventB, data={'count': 3})
Handled Event B with data: {'count': 3}
[Producer] Generated: Event(type=EventA, data={'count': 4})
[Consumer] Processing: Event(type=EventA, data={'count': 4})
Handled Event A with data: {'count': 4}
[Producer] Generated: Event(type=EventB, data={'count': 5})
[Consumer] Processing: Event(type=EventB, data={'count': 5})
Handled Event B with data: {'count': 5}
[Producer] Generated: Event(type=E

In [5]:
import time

def gen():
    for i in range(10):
        time.sleep(1)
        yield i
        
generator = gen()
for i in generator:
    print(i)

0
1
2
3
4
5
6
7
8
9


In [2]:
import requests

res = requests.post("http://localhost:5000/api/workflow/1")

print(res.json())

{'message': 'Workflow executed successfully'}


In [6]:
import requests

res = requests.get("http://localhost:5000/api/graph/1")

print(res.json())

JSONDecodeError: Expecting value: line 1 column 1 (char 0)