-
-
Notifications
You must be signed in to change notification settings - Fork 587
feat: async mechanism for multi-trace #981
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
# This is the very first experiment in a new tree. | ||
return trace.NEW_ROOT | ||
|
||
# Find the index of the last selected leaf in the current list of leaves |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Current code will continuously produce ideas even previous ideas are not finished.
@@ -210,44 +216,68 @@ def feedback(self, prev_out: dict[str, Any]) -> ExperimentFeedback: | |||
logger.log_object(feedback) | |||
return feedback | |||
|
|||
def record(self, prev_out: dict[str, Any]): | |||
async def record(self, prev_out: dict[str, Any]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.trace = DSTrace(scen=self.trace.scen, knowledge_base=self.trace.knowledge_base) | ||
|
||
# ... (rest of original record logic) ... | ||
if self.trace.sota_experiment() is None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we reduce the indent here?
async def async_gen(self, trace: DSTrace, loop: LoopBase) -> DSExperiment: | ||
# This now needs to align with the base class but might not be fully parallel-aware yet. | ||
# The new producer-consumer loop will set the context via trace.set_current_selection. | ||
return self.gen(trace) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this will result in deadlock
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ignore this if it is not implememted
@@ -817,7 +822,7 @@ def gen(self, trace: DSTrace) -> DSExperiment: | |||
] | |||
) | |||
|
|||
sota_exp = trace.sota_experiment() | |||
sota_exp = trace.sota_experiment(selection=local_selection) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can set current_selection here directly and leverage the benefit of single process
rdagent/utils/workflow/loop.py
Outdated
@@ -212,7 +213,7 @@ async def _run_step(self, li: int, force_subproc: bool = False) -> None: | |||
result = await curr_loop.run_in_executor(pool, func, self.loop_prev_out[li]) | |||
else: | |||
# auto determine whether to run async or sync | |||
if asyncio.iscoroutinefunction(func): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
curious about the difference of the two implementations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
almost the same, rollback to use asyncio.iscoroutinefunction(func)
""" | ||
Atomically selects the next leaf node from the trace in order. | ||
""" | ||
async with self._lock: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think lock is not necesary here.
rdagent/app/data_science/conf.py
Outdated
@@ -112,4 +112,9 @@ class DataScienceBasePropSetting(KaggleBasePropSetting): | |||
"""The maximum number of SOTA experiments to retrieve in a LLM call""" | |||
|
|||
|
|||
#### multi-trace: enable parallel multi-trace | |||
enable_parallel_multi_trace: bool = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If lock is not necessary, we can remove it.
dev_lop.md
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove it.
.gitignore
Outdated
@@ -5,7 +5,7 @@ Pipfile | |||
public | |||
release-notes.md | |||
typescript* | |||
|
|||
deve_lop.md |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not necessary
@@ -23,9 +23,15 @@ def __init__(self, pending_tasks_list: list, *args, **kwargs) -> None: | |||
# this field is optional. It is not none only when we have a format checker. Currently, only following cases are supported. | |||
# - mle-bench | |||
|
|||
# For parallel multi-trace support | |||
self.local_selection: tuple[int, ...] | None = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is very general. Please
Move this into class Experiment
@@ -16,6 +18,7 @@ | |||
from rdagent.scenarios.data_science.proposal.exp_gen.proposal import DSProposalV2ExpGen | |||
from rdagent.utils.agent.tpl import T | |||
from rdagent.utils.workflow import wait_retry | |||
from rdagent.utils.workflow.loop import LoopBase |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from rdagent.utils.workflow.loop import LoopBase |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
import json | ||
from datetime import timedelta | ||
from typing import Dict, Tuple | ||
|
||
from rdagent.app.data_science.conf import DS_RD_SETTING | ||
from rdagent.components.coder.data_science.pipeline.exp import PipelineTask | ||
from rdagent.core.conf import RD_AGENT_SETTINGS |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from rdagent.core.conf import RD_AGENT_SETTINGS |
@@ -1,11 +1,13 @@ | |||
"""Merge the version in different traces""" | |||
|
|||
import asyncio |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import asyncio |
Chinese ver description for easy understand, Eng ver is below
1. 核心设计思想
采用 "显式上下文传递与属性注入 (Explicit Context Passing & Attribute Injection)" 方案。
目标:在最小化改动核心循环 (
loop.py
) 的前提下,实现可由配置开关控制的、并发安全的多 trace 并行探索功能。核心创新:通过显式的
local_selection
参数传递,完全消除对共享状态trace.current_selection
的依赖,从根本上避免了 Race Condition。1.1. 技术方案对比
原始问题分析
在并行环境中,传统的共享状态方式存在严重的 Race Condition:
解决方案:显式上下文传递
优势:
local_selection=None
时使用传统模式2. 各模块代码修改逻辑梳理
2.1.
DSExperiment
(上下文载体)rdagent/scenarios/data_science/experiment/experiment.py
DSExperiment
类增加:self.local_selection: tuple[int, ...] | None = None
set_local_selection(local_selection: tuple[int, ...]) -> None
record
阶段能够正确地将实验连接到 trace 图的正确位置。2.2.
TraceScheduler
(决策者)rdagent/scenarios/data_science/proposal/exp_gen/trace_scheduler.py
TraceScheduler
抽象基类:定义调度接口RoundRobinScheduler
实现类:轮询调度策略asyncio.Lock
保护自身状态,确保在并发环境中能够安全、公平地分配探索目标。2.3.
ParallelMultiTraceExpGen
(总控制器)rdagent/scenarios/data_science/proposal/exp_gen/parallel.py
2.4.
DSProposalV2ExpGen
(底层生成器)rdagent/scenarios/data_science/proposal/exp_gen/proposal.py
gen()
方法签名和实现:local_selection
参数:local_selection=None
时,保持原有行为2.5.
DataScienceRDLoop
(核心循环)rdagent/scenarios/data_science/loop.py
record
方法从同步改为异步:def record(self, prev_out: dict[str, Any]) -> dict[str, Any]
async def record(self, prev_out: dict[str, Any]) -> dict[str, Any]
_perform_record
:添加状态同步逻辑:record
改为async
确保与整个异步工作流的兼容性,为未来可能的异步扩展做准备3. 工作流程详解
3.1. 并行生成阶段
3.2. 记录同步阶段
4. 关键技术特性
4.1. 并发安全性
local_selection
变量RoundRobinScheduler
使用asyncio.Lock
保护自身状态_perform_record
天然串行执行,无需额外锁4.2. 智能分支管理
4.3. 向后兼容性
local_selection
参数默认为None
,保持原有行为5. 使用方式
5.1. 配置启用
5.2. ExpGen选择
5.3. 调度策略配置
Eng version
1. Core Design Philosophy
Adopt the "Explicit Context Passing & Attribute Injection" approach.
Goal: To implement a concurrently safe, multi-trace parallel exploration feature that can be toggled by configuration, all while minimizing modifications to the core loop (
loop.py
).Core Innovation: By using explicit
local_selection
parameter passing, we completely eliminate dependency on the shared statetrace.current_selection
, fundamentally avoiding Race Conditions.1.1. Technical Approach Comparison
Original Problem Analysis
In parallel environments, the traditional shared state approach has serious Race Conditions:
Solution: Explicit Context Passing
Advantages:
local_selection=None
2. Code Modification Logic by Module
2.1.
DSExperiment
(Context Carrier)rdagent/scenarios/data_science/experiment/experiment.py
DSExperiment
class:self.local_selection: tuple[int, ...] | None = None
set_local_selection(local_selection: tuple[int, ...]) -> None
record
phase.2.2.
TraceScheduler
(Decision Maker)rdagent/scenarios/data_science/proposal/exp_gen/trace_scheduler.py
TraceScheduler
abstract base class: Define scheduling interfaceRoundRobinScheduler
implementation: Round-robin scheduling strategyasyncio.Lock
to protect its own state, ensuring safe and fair target allocation in concurrent environments.2.3.
ParallelMultiTraceExpGen
(Main Controller)rdagent/scenarios/data_science/proposal/exp_gen/parallel.py
2.4.
DSProposalV2ExpGen
(Underlying Generator)rdagent/scenarios/data_science/proposal/exp_gen/proposal.py
gen()
method signature and implementation:local_selection
parameter:local_selection=None
2.5.
DataScienceRDLoop
(Core Loop)rdagent/scenarios/data_science/loop.py
direct_exp_gen
: Remove checkpoint selection logic, focus on callingexp_gen.async_gen()
record
method from sync to async:def record(self, prev_out: dict[str, Any]) -> dict[str, Any]
async def record(self, prev_out: dict[str, Any]) -> dict[str, Any]
_perform_record
: Add state synchronization logic:record
toasync
ensures compatibility with the entire async workflow and prepares for potential future async extensions3. Workflow Details
3.1. Parallel Generation Phase
3.2. Recording Synchronization Phase
4. Key Technical Features
4.1. Concurrency Safety
local_selection
variablesRoundRobinScheduler
usesasyncio.Lock
to protect its own state_perform_record
executes serially by nature, no additional locks needed4.2. Intelligent Branch Management
4.3. Backward Compatibility
local_selection
parameter defaults toNone
, maintaining original behavior5. Usage
5.1. Configuration Enablement
5.2. ExpGen Selection
5.3. Scheduling Strategy Configuration
📚 Documentation preview 📚: https://RDAgent--981.org.readthedocs.build/en/981/