# 参数解析

在 Bridgic 中，一个执行单元（worker）不仅可以通过 **结果分发** 机制控制其结果如何传递给直接依赖于它的 workers，还可以通过 **参数绑定** 机制控制它如何接收来自前置 workers 的结果。此外，这些机制还决定了一个 automa 的输入参数如何分配给其内部 workers。这两个机制在 Bridgic 中统称为 **参数解析**。

## 结果分发

结果分发是一种用于将数据从一个 worker 分配到多个直接依赖于它的 workers，或从 automa 的输入分配到其起始 workers 的机制。让我们通过一个实际的例子来理解这个特性。

假设我们需要并行处理多个用户查询。每个查询需要经过预处理、分析，然后进行聚合。我们可以使用结果分发机制来有效地处理这种情况。

### 1. 初始化

让我们开始导入必要的包。

In [None]:
from typing import Tuple
from bridgic.core.automa import GraphAutoma, worker
from bridgic.core.automa.args import InOrder, ResultDispatchingRule

### 2. 处理每个输入

首先，让我们了解如何在调用 [`arun()`](../../../../reference/bridgic-core/bridgic/core/automa/#bridgic.core.automa.GraphAutoma.arun) 时使用 [`InOrder`](../../../../reference/bridgic-core/bridgic/core/automa/args/#bridgic.core.automa.args.InOrder)。当您有多个起始 Worker 并希望将不同的输入值分配给它们时，可以将输入数据包装在 `InOrder()` 中。

In [2]:
class ParallelProcessing(GraphAutoma):
    @worker(is_start=True)
    async def process_query_1(self, user_input: int) -> int:
        print(f"Processing query 1 with input: {user_input}")
        return user_input * 2  # 2
    
    @worker(is_start=True)
    async def process_query_2(self, user_input: int) -> int:
        print(f"Processing query 2 with input: {user_input}")
        return user_input * 3  # 6
    
    @worker(dependencies=["process_query_1", "process_query_2"], is_output=True)
    async def aggregate_results(self, result1: int, result2: int) -> int:
        print(f"Aggregating: {result1} + {result2}")
        return result1 + result2  # 8


现在让我们使用 `InOrder` 来将不同的值分配给两个起始 Worker：

In [None]:
automa = ParallelProcessing()
res = await automa.arun(user_input=InOrder([1, 2]))
print(res)

Processing query 1 with input: 1
Processing query 2 with input: 2
Aggregating: 2 + 6
8


太好了！正如您所看到的，`InOrder([1, 2])` 将值 `1` 和 `2` 分别分配给 `process_query_1` 和 `process_query_2`，这是基于启动 Worker 的声明顺序。

> **注意**：`InOrder()` 中列表/元组的长度必须与接受相应参数的启动 Worker 的数量匹配。否则，将引发错误。

<div style="text-align: center; margin: 2rem 0;">
<hr style="border: none; border-top: 2px solid #e2e8f0;">
</div>

### 3. 处理结果分发中的每个项目

现在让我们了解如何使用 [`result_dispatching_rule=ResultDispatchingRule.IN_ORDER`](../../../../reference/bridgic-core/bridgic/core/automa/args/#bridgic.core.automa.args.ResultDispatchingRule) 将 Worker 的输出分发给多个下游 Worker。

当一个 Worker 设置 `result_dispatching_rule=ResultDispatchingRule.IN_ORDER` 时，它的返回值必须是一个可迭代对象（元组或列表）。返回值中的每个元素将按照声明或添加的顺序分发给直接依赖于该 Worker 的下游 Worker。

In [8]:
class ResultDispatchingExample(GraphAutoma):
    @worker(is_start=True, result_dispatching_rule=ResultDispatchingRule.AS_IS)
    async def generate_data(self, user_input: int) -> int:
        return user_input
    
    @worker(dependencies=["generate_data"], result_dispatching_rule=ResultDispatchingRule.IN_ORDER)
    async def process_and_split(self, data: int) -> Tuple[int, int]:
        print(f"Processing data: {data}")
        # Return a tuple that will be distributed to downstream workers
        return data + 1, data + 2  # (2, 3)
    
    @worker(dependencies=["process_and_split"])
    async def worker_a(self, value: int) -> int:
        print(f"Worker A received: {value}")
        return value * 10  # 20
    
    @worker(dependencies=["process_and_split"])
    async def worker_b(self, value: int) -> int:
        print(f"Worker B received: {value}")
        return value * 20  # 60
    
    @worker(dependencies=["worker_a", "worker_b"], is_output=True)
    async def combine_results(self, result_a: int, result_b: int) -> int:
        print(f"Combining: {result_a} + {result_b}")
        return result_a + result_b  # 80


当 `process_and_split` 返回 `(2, 3)` 时，我们希望将这些元素分别分配给 `worker_a` 和 `worker_b`。请注意，`result_dispatching_rule` 应该在 **生成** 结果的 worker 上设置，而不是在 **接收** 结果的 worker 上。

现在让我们运行它：

In [9]:
automa = ResultDispatchingExample()
res = await automa.arun(user_input=1)
print(res)

Processing data: 1
Worker A received: 2
Worker B received: 3
Combining: 20 + 60
80


完美！正如您所看到的：

1. `process_and_split` 返回了 `(2, 3)`，并且 `result_dispatching_rule=ResultDispatchingRule.IN_ORDER`。
2. 第一个值 `2` 被分配给 `worker_a`（第一个下游工作者）。
3. 第二个值 `3` 被分配给 `worker_b`（第二个下游工作者）。
4. 两个工作者处理了他们的值，结果被组合在一起。

> **重要说明**：
> - 设置 `result_dispatching_rule=ResultDispatchingRule.IN_ORDER` 的工作者必须返回一个可迭代对象（元组或列表）。
> - 返回的可迭代对象的长度必须与直接依赖于该工作者的下游工作者数量相匹配。
> - 分配顺序遵循下游工作者在图中声明的顺序。

`ResultDispatchingRule` 有两种模式：
- **AS_IS**：工作者的结果将作为整体发送给每个依赖于它的下游工作者。这是默认行为。
- **IN_ORDER**：工作者将按照下游工作者声明或添加的顺序，依次将当前结果分配给相应的工作者。

## 参数绑定

参数绑定是一种机制，指定工作者如何从其直接前驱、非直接前驱或其所属的自动机的输入参数中接收参数。完成参数绑定过程有三种方法，包括 **Arguments Mapping**、**Arguments Injection** 和 **Inputs Propagation**。现在让我们通过一个示例来理解它们。

<br>
<div style="text-align: center;">
<img src="../../../imgs/Parameter_Passing.png" alt="参数传递" width="800" height="600">
</div>
<br>

## 查询扩展

查询扩展是 RAG 中的一个常见步骤，可以增强 RAG 的质量。为了提高查询扩展的质量，开发人员通常首先从查询中提取实体信息，并利用这些信息来帮助模型扩展原始查询。

现在让我们实现这一点。用户输入原始查询，然后我们扩展查询以获得更多查询。完成查询扩展有三个步骤：

1. 接收用户的输入并进行预处理以获取原始查询。
2. 从查询中提取实体信息以获取实体信息。
3. 扩展并获得多个查询。

### 1. 初始化

在我们开始之前，让我们准备运行环境。在本教程中，我们将使用支持 [`StructuredOutput`](../../../../reference/bridgic-core/bridgic/core/model/protocols/#bridgic.core.model.protocols.StructuredOutput) 特性的 [OpenAI 模型集成](../../../../reference/bridgic-llms-openai/bridgic/llms/openai/#bridgic.llms.openai.OpenAILlm)（不是 OpenAI 类似的）。运行以下 `pip` 命令以确保此集成可用。

```shell
pip install bridgic-llms-openai
```

In [None]:
# Get the environment variables.
import os

_api_key = os.environ.get("OPENAI_API_KEY")
_api_base = os.environ.get("OPENAI_API_BASE")
_model_name = os.environ.get("OPENAI_MODEL_NAME")

# Import the necessary packages.
from pydantic import BaseModel, Field
from typing import List, Dict, Tuple
from bridgic.core.automa import GraphAutoma, worker
from bridgic.core.automa.args import ArgsMappingRule
from bridgic.core.model.types import Message, Role
from bridgic.core.model.protocols import PydanticModel
from bridgic.llms.openai import OpenAILlm, OpenAIConfiguration

llm = OpenAILlm(  # the llm instance
    api_base=_api_base,
    api_key=_api_key,
    timeout=5,
    configuration=OpenAIConfiguration(model=_model_name),
)

### 2. 完整查询扩展

现在让我们实现这个查询扩展。我们假设收到的用户查询是 JSON 格式。它包含三个键：
1. `id`: 一个字符串，表示谁输入了查询。
2. `query`: 一个字符串，形式为 `Q: user_query`，表示用户输入的问题。
3. `date`: 用户输入查询的时间。

In [None]:
query_obj = {
    "id": "user_1",
    "query": "Q: What new developments have there been in RAG in the past year?",
    "date": "2025-09-30"
}

此外，我们定义当模型完成实体提取和查询扩展时，它将结果以 Pydantic 数据结构返回。

In [5]:
class EntityList(BaseModel):  # The expected format of the model output in the extract_entity worker
    entities: List[str] = Field(description="All entities in the input.")

class QueryList(BaseModel):  # The expected format of the model output in the expand_query worker
    queries: List[str] = Field(description="All queries in the input.")

接下来，让我们完成查询扩展的三个步骤，以实现我们的目标：

In [None]:
class QueryExpansion(GraphAutoma):
    @worker(is_start=True)
    async def pre_query(self, query_obj: Dict):  # Receive the user's input and preprocess query
        query = query_obj["query"].split(":")[1].strip()  
        return query

    @worker(is_start=True)
    async def pre_date(self, query_obj: Dict):  # Receive the user's input and preprocess date
        date = query_obj["date"]
        return date

    @worker(dependencies=["pre_query", "pre_date"], args_mapping_rule=ArgsMappingRule.AS_IS)
    async def extract_entity(self, query: str, date: str):  # Extract the entity information from the question, get entity information.
        response: EntityList = await llm.astructured_output(  
            constraint=PydanticModel(model=EntityList),
            messages=[
                Message.from_text(text="extract the entity information from the given query", role=Role.SYSTEM),
                Message.from_text(text=query, role=Role.USER,),
            ]
        )
        return query, response.entities, date

    @worker(dependencies=["extract_entity"], is_output=True, args_mapping_rule=ArgsMappingRule.AS_IS)
    async def expand_query(self, query_meta: Tuple[str, List[str]]):  # Expand and obtain multiple queries.
        query, entities, date = query_meta
        task_input = f"Query: {query}\nEntities: {entities}"
        response: QueryList = await llm.astructured_output(
            constraint=PydanticModel(model=QueryList),
            messages=[
                Message.from_text(text=f"Centered around the given entities and given date {date}, expand the query to obtain multiple queries", role=Role.SYSTEM),
                Message.from_text(text=task_input, role=Role.USER,),
            ]
        )
        return response.queries

让我们运行它！

In [7]:
query_expansion = QueryExpansion()
await query_expansion.arun(query_obj)

['What are the latest advancements in Retrieval-Augmented Generation (RAG) technology as of 2025?',
 'What new developments have emerged in RAG systems over the past 12 months?',
 'How have RAG implementations evolved in the last year?',
 'What innovations in RAG have been introduced between 2024 and 2025?',
 'What are the key breakthroughs in RAG technology in 2025?',
 'What new features or improvements have been added to RAG models in the past year?',
 'How has the performance of RAG systems improved in the last 12 months?',
 'What are the most recent trends and developments in RAG research and deployment?',
 'What new techniques have been introduced in RAG to improve accuracy and efficiency in 2025?',
 'What are the major updates in RAG frameworks and tools from 2024 to 2025?']

太好了！我们已经成功完成了查询扩展的小模块。

<div style="text-align: center; margin: 2rem 0;">
<hr style="border: none; border-top: 2px solid #e2e8f0;">
</div>

## 我们学到了什么？

### 结果分发

结果分发提供了强大的并行/并发数据处理机制：

- **`Distribute` in `arun()`**: 将输入值逐元素分发给多个起始 Worker
- **`ResultDispatchRule`**: 允许 Worker 将其输出（必须是可迭代的）分发给多个下游 Worker
    - **AS_IS**: 默认模式，将当前 Worker 的结果整体发送
    - **Distribute**: 设置为根据下游 Worker 声明或添加的顺序发送当前 Worker 的结果

这些功能使得在复杂工作流中实现高效的并行/并发处理和数据流管理成为可能。

### 参数映射

查看代码，我们发现每个 [`@worker`](../../../../reference/bridgic-core/bridgic/core/automa/#bridgic.core.automa.worker._worker_decorator.worker) 装饰器都有一个 `args_mapping_rule` 参数。

`args_mapping_rule` 定义了数据在直接依赖的 Worker 之间传递的方式，即前一个 Worker 的结果如何映射到下一个 Worker 的参数。其值只能通过 [`ArgsMappingRule`](../../../../reference/bridgic-core/bridgic/core/automa/args/#bridgic.core.automa.args.ArgsMappingRule) 的属性来指定。

#### AS_IS 模式（默认）

在 AS_IS 模式下，Worker 将按照依赖声明的顺序接收所有直接依赖 Worker 的输出作为输入参数。

在上述示例中，`extract_entity` 声明了依赖关系：`dependencies=["pre_query", "pre_date"]`，因此前两个 Worker 的结果将按照依赖声明的顺序映射到 `extract_entity` 的第一个和第二个参数，`pre_query` 的结果映射到 `query` 参数，`pre_date` 的结果映射到 `date` 参数。

> 注意：
> 依赖中的声明顺序仅影响参数映射的顺序，但不影响依赖 Worker 的执行顺序。

<br>
<div style="text-align: center;">
<img src="../../../imgs/args_mapping_AS_IS.png" alt="参数传递" width="600" height="400">
</div>
<br>

此外，如果前一个 Worker 返回多个值的结果，例如 `return x, y`，那么所有结果将作为一个元组结果传递。因此在上述示例中，`expand_query` 的参数 `query_meta` 接收了来自 `extract_entity` 的所有结果值。

#### UNPACK 模式

让我们回到之前的示例。在 `expand_query` 中，我们以 `AS_IS` 模式接收来自前一个 Worker 的参数，并将它们整体手动解包，如下所示：

In [None]:
@worker(dependencies=["extract_entity"], is_output=True, args_mapping_rule=ArgsMappingRule.AS_IS)
async def expand_query(self, query_meta: Tuple[str, List[str]]):  # Expand and obtain multiple queries.
    query, entities, date = query_meta
    ...

此操作需要了解 `query_meta` 的所有参数，这可能看起来不太方便。我们能否在返回时完成解包操作并填充相应的参数？此时，`UNPACK` 模式派上用场。

让我们修改上面示例中的 `expand_query` 并添加一些打印消息。

In [None]:
class QueryExpansion(GraphAutoma):
    @worker(is_start=True)
    async def pre_query(self, query_obj: Dict):  # Receive the user's input and preprocess query
        query = query_obj["query"].split(":")[1].strip()  
        return query

    @worker(is_start=True)
    async def pre_date(self, query_obj: Dict):  # Receive the user's input and preprocess date
        date = query_obj["date"]
        return date

    @worker(dependencies=["pre_query", "pre_date"], args_mapping_rule=ArgsMappingRule.AS_IS)
    async def extract_entity(self, query: str, date: str):  # Extract the entity information from the question, get entity information.
        response: EntityList = await llm.astructured_output(  
            constraint=PydanticModel(model=EntityList),
            messages=[
                Message.from_text(text="extract the entity information from the given query", role=Role.SYSTEM),
                Message.from_text(text=query, role=Role.USER,),
            ]
        )
        return query, response.entities, date

    @worker(dependencies=["extract_entity"], is_output=True, args_mapping_rule=ArgsMappingRule.UNPACK)
    async def expand_query(self, query: str, entities: List[str], date: str):  # Expand and obtain multiple queries.
        print(f"query: {query}, entities: {entities}, date: {date}")
        task_input = f"Query: {query}\nEntities: {entities}"
        response: QueryList = await llm.astructured_output(
            constraint=PydanticModel(model=QueryList),
            messages=[
                Message.from_text(text=f"Centered around the given entities and given date {date}, expand the query to obtain multiple queries", role=Role.SYSTEM),
                Message.from_text(text=task_input, role=Role.USER,),
            ]
        )
        return response.queries

让我们开始吧！

In [9]:
query_expansion = QueryExpansion()
await query_expansion.arun(query_obj)

query: What new developments have there been in RAG in the past year?, entities: ['RAG', 'new developments', 'past year'], date: 2025-09-30


['What are the latest advancements in Retrieval-Augmented Generation (RAG) technologies as of 2025?',
 'What new developments have emerged in RAG systems over the past 12 months?',
 'How has RAG evolved in the last year with regard to accuracy, efficiency, and scalability?',
 'What are the key innovations in RAG that have been introduced between 2024 and 2025?',
 'What new tools and frameworks have been launched for RAG implementation in the past year?',
 'What recent breakthroughs in RAG have improved context handling and retrieval precision?',
 'How have large language models integrated with RAG in the past year to enhance performance?',
 'What are the most significant updates in RAG-based applications from 2024 to 2025?',
 'What new techniques in RAG have been proposed to reduce hallucinations and improve factual consistency?',
 'How have RAG solutions adapted to real-time data retrieval and dynamic content updates in the past year?']

太好了！所有参数都已解包并接受。可以看出，`unpack` 模式使我们的任务流程更加清晰！

但是，需要注意的是，UNPACK 机制要求当前 worker **只能直接依赖一个 worker**；否则，在解包时多个 worker 的结果会混淆！

#### MERGE 模式

与此同时，反过来，由于存在 UNPACK 机制，是否也有可以聚合多个结果以供接收的机制？当一个 worker 收集多个依赖 worker 的结果时，这一点尤其有用。此时，`MERGE` 模式派上用场。

仍然参考上面的例子，`extract_entity` 实际上收到了来自两个 worker 的结果。现在让我们尝试让 `extract_entity` 在一个参数中接收所有这些结果，而不是接收两个参数。

让我们修改上面例子中的 `extract_entity` 并添加一些打印消息。

In [None]:
class QueryExpansion(GraphAutoma):
    @worker(is_start=True)
    async def pre_query(self, query_obj: Dict):  # Receive the user's input and preprocess query
        query = query_obj["query"].split(":")[1].strip()  
        return query

    @worker(is_start=True)
    async def pre_date(self, query_obj: Dict):  # Receive the user's input and preprocess date
        date = query_obj["date"]
        return date

    @worker(dependencies=["pre_query", "pre_date"], args_mapping_rule=ArgsMappingRule.MERGE)
    async def extract_entity(self, query_meta: Tuple[str, str]):  # Extract the entity information from the question, get entity information.
        print(f"query_meta: {query_meta}")
        query, date = query_meta
        response: EntityList = await llm.astructured_output(  
            constraint=PydanticModel(model=EntityList),
            messages=[
                Message.from_text(text="extract the entity information from the given query", role=Role.SYSTEM),
                Message.from_text(text=query, role=Role.USER,),
            ]
        )
        return query, response.entities, date

    @worker(dependencies=["extract_entity"], is_output=True, args_mapping_rule=ArgsMappingRule.UNPACK)
    async def expand_query(self, query: str, entities: List[str], date: str):  # Expand and obtain multiple queries.
        task_input = f"Query: {query}\nEntities: {entities}"
        response: QueryList = await llm.astructured_output(
            constraint=PydanticModel(model=QueryList),
            messages=[
                Message.from_text(text=f"Centered around the given entities and given date {date}, expand the query to obtain multiple queries", role=Role.SYSTEM),
                Message.from_text(text=task_input, role=Role.USER,),
            ]
        )
        return response.queries

让我们运行它！

In [11]:
query_expansion = QueryExpansion()
await query_expansion.arun(query_obj)

query_meta: ['What new developments have there been in RAG in the past year?', '2025-09-30']


['What are the latest advancements in Retrieval-Augmented Generation (RAG) technology as of 2025?',
 'What new developments have emerged in RAG systems over the past 12 months?',
 'How has RAG evolved in the last year with recent innovations in AI and NLP?',
 'What are the key updates and breakthroughs in RAG models from 2024 to 2025?',
 'What new features or improvements have been introduced in RAG implementations in the past year?',
 'What are the most significant RAG developments reported in 2025?',
 'How have retrieval and generation components in RAG been improved in the last year?',
 'What are the recent trends and new developments in RAG applications from 2024 to 2025?',
 'What innovations in RAG have been introduced by leading AI companies in the past year?',
 'What new challenges and solutions have emerged in RAG research over the last 12 months?']

很好！`extract_entity` 依赖于的结果已经从 Worker 收集到一个列表中，并传递给它的参数。

### 参数注入

回顾上面的例子，我们实际上发现 `date` 信息是通过 `pre_date`、`extract_entity` 传递的，最终到达 `expand_query`。然而，实际上 `extract_entity` 根本不使用这些信息。因此，在这里传递 `date` 似乎是多余的。而在 `expand_query` 中使用 `date` 实际上仅意味着数据依赖于它，但是否执行，这种控制依赖并不直接依赖于它。

> Bridgic 强调数据依赖和控制依赖的分离。这对于未来构建复杂图形是有益的，因为它允许解耦，并避免由于数据依赖的变化而需要调整整个图形。

在 Bridgic 中，我们可以使用参数注入来实现这一点。我们可以在声明参数时使用 `From` 标记来指示取哪个 Worker 的结果，同时如果没有获得结果，则设置默认值。例如：

In [None]:
# import the From marker
from bridgic.core.automa import From

@worker(dependencies=["extract_entity"], is_output=True, args_mapping_rule=ArgsMappingRule.UNPACK)
async def expand_query(self, query_meta: Tuple[str, str], date: str = From("pre_date", "2025-01-01")):  # Expand and obtain multiple queries.
    ...

`date: str = From("pre_date", "2025-01-01")` 表示 `date` 的值将根据 `pre_date` Worker 的结果进行赋值。如果该 Worker 的结果尚未生成，则将使用默认值。

> 如果 pre_date Worker 不存在，或者 pre_date Worker 尚未生成结果，并且没有默认值，将会报告错误：AutomaDataInjectionError。

让我们修改上述示例并添加一些打印消息。

In [None]:
class QueryExpansion(GraphAutoma):
    @worker(is_start=True)
    async def pre_query(self, query_obj: Dict):  # Receive the user's input and preprocess query
        query = query_obj["query"].split(":")[1].strip()  
        return query

    @worker(is_start=True)
    async def pre_date(self, query_obj: Dict):  # Receive the user's input and preprocess date
        date = query_obj["date"]
        return date

    @worker(dependencies=["pre_query"], args_mapping_rule=ArgsMappingRule.AS_IS)
    async def extract_entity(self, query: str):  # Extract the entity information from the question, get entity information.
        response: EntityList = await llm.astructured_output(  
            constraint=PydanticModel(model=EntityList),
            messages=[
                Message.from_text(text="extract the entity information from the given query", role=Role.SYSTEM),
                Message.from_text(text=query, role=Role.USER,),
            ]
        )
        return query, response.entities

    @worker(dependencies=["extract_entity"], is_output=True, args_mapping_rule=ArgsMappingRule.UNPACK)
    async def expand_query(self, query: str, entities: List[str], date: str = From("pre_date", "2025-01-01")):  # Expand and obtain multiple queries.
        print(f"query: {query}, entities: {entities}, date: {date}")
        task_input = f"Query: {query}\nEntities: {entities}"
        response: QueryList = await llm.astructured_output(
            constraint=PydanticModel(model=QueryList),
            messages=[
                Message.from_text(text=f"Centered around the given entities and given date {date}, expand the query to obtain multiple queries", role=Role.SYSTEM),
                Message.from_text(text=task_input, role=Role.USER,),
            ]
        )
        return response.queries

让我们运行它！

In [14]:
query_expansion = QueryExpansion()
await query_expansion.arun(query_obj)

query: What new developments have there been in RAG in the past year?, entities: ['RAG', 'new developments', 'past year'], date: 2025-09-30


['What are the latest advancements in Retrieval-Augmented Generation (RAG) technology as of 2025?',
 'What new developments have emerged in RAG systems over the past 12 months?',
 'How have RAG implementations evolved in the last year in terms of performance and scalability?',
 'What innovations in RAG have been introduced in 2025 that improve accuracy and context handling?',
 'What are the key breakthroughs in RAG research and deployment from 2024 to 2025?',
 'What new tools and frameworks have been released for RAG in the past year?',
 'How have privacy and security features improved in RAG systems over the last year?',
 'What are the most notable RAG developments in enterprise AI applications from 2024 to 2025?',
 "What recent improvements have been made to RAG's ability to handle long-context inputs?",
 'How has the integration of RAG with large language models evolved in the past year?']

我已经修改了 `extract_entity`，现在它只接受 `query`，使其功能更加纯粹。此外，在 `expand_query` 中，我正确地获取了 `date`。

### 输入传播

再回顾一下上面的例子，我们的程序根本没有处理输入中的 `id` 字段。最终，我们只返回了一系列通用问题，这可能导致外部调用无法关联哪个 "id" 对应于结果。然而，这个 ID 既不需要预处理，也不需要用于实体提取。

我们可以使用输入传播来解决这个问题。这可以通过在声明参数时将启动参数的名称添加到 worker 中来实现。

@worker(dependencies=["extract_entity"], is_output=True args_mapping_rule=ArgsMappingRule.UNPACK)
async def expand_query(
    self, 
    query: str, 
    entities: List[str], 
+    query_obj: Dict,  # 整个 Automa 的输入
    date: str = From("pre_date", "2025-01-01"), 
):  # 扩展并获取多个查询。
    ...

让我们修改上述示例并添加一些打印消息。

In [None]:
class QueryExpansion(GraphAutoma):
    @worker(is_start=True)
    async def pre_query(self, query_obj: Dict):  # Receive the user's input and preprocess query
        query = query_obj["query"].split(":")[1].strip()  
        return query

    @worker(is_start=True)
    async def pre_date(self, query_obj: Dict):  # Receive the user's input and preprocess date
        date = query_obj["date"]
        return date

    @worker(dependencies=["pre_query"], args_mapping_rule=ArgsMappingRule.AS_IS)
    async def extract_entity(self, query: str):  # Extract the entity information from the question, get entity information.
        response: EntityList = await llm.astructured_output(  
            constraint=PydanticModel(model=EntityList),
            messages=[
                Message.from_text(text="extract the entity information from the given query", role=Role.SYSTEM),
                Message.from_text(text=query, role=Role.USER,),
            ]
        )
        return query, response.entities

    @worker(dependencies=["extract_entity"], is_output=True, args_mapping_rule=ArgsMappingRule.UNPACK)
    async def expand_query(self, query: str, entities: List[str], query_obj: Dict, date: str = From("pre_date", "2025-01-01")):  # Expand and obtain multiple queries.
        print(f"query: {query}, entities: {entities}, date: {date}, query_obj: {query_obj}")
        task_input = f"Query: {query}\nEntities: {entities}"
        response: QueryList = await llm.astructured_output(
            constraint=PydanticModel(model=QueryList),
            messages=[
                Message.from_text(text=f"Centered around the given entities and given date {date}, expand the query to obtain multiple queries", role=Role.SYSTEM),
                Message.from_text(text=task_input, role=Role.USER,),
            ]
        )
        return {"id": query_obj["id"], "queries": response.queries}

让我们开始吧！在使用 Inputs Propagation 时，启动参数必须以关键字的形式在启动时传递。

In [None]:
query_expansion = QueryExpansion()
await query_expansion.arun(query_obj=query_obj)  # using keyword parameter passing

query: What new developments have there been in RAG in the past year?, entities: ['RAG', 'new developments', 'past year'], date: 2025-09-30, query_obj: {'id': 'user_1', 'query': 'Q: What new developments have there been in RAG in the past year?', 'date': '2025-09-30'}


{'id': 'user_1',
 'queries': ['What are the latest advancements in Retrieval-Augmented Generation (RAG) technologies as of 2025?',
  'What new developments have emerged in RAG systems over the past 12 months?',
  'How have RAG implementations evolved in the last year in terms of performance and scalability?',
  'What are the key innovations in RAG models reported between 2024 and 2025?',
  'What new techniques have been introduced in RAG to improve accuracy and context retention in the past year?',
  'What recent breakthroughs in RAG have been highlighted in 2025?',
  'How have industry leaders advanced RAG technology in the last year?',
  'What are the most significant updates in RAG frameworks and tools from 2024 to 2025?',
  'What new challenges and solutions have been proposed in RAG research over the past year?',
  'What developments in RAG have improved real-time retrieval and generation performance in 2025?']}

在上述所有参数传递方式中，优先顺序为：**参数映射位置参数 > 参数注入 > 传播 > 参数映射关键字参数**。