Skip to content

feat:新增pipeline算子提取 #12

Merged
MOLYHECI merged 17 commits intoOpenDCAI:backendfrom
duanchy3:pr10-branch
Dec 7, 2025
Merged

feat:新增pipeline算子提取 #12
MOLYHECI merged 17 commits intoOpenDCAI:backendfrom
duanchy3:pr10-branch

Conversation

@duanchy3
Copy link
Copy Markdown
Contributor

@duanchy3 duanchy3 commented Dec 2, 2025

No description provided.

Copilot AI review requested due to automatic review settings December 6, 2025 07:18
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds a new feature to automatically extract and track pipeline operators from Python files in the api_pipelines directory. The implementation changes the pipeline registry from an in-memory dictionary to a YAML file-based persistence layer.

Key Changes:

  • Adds AST-based operator extraction from Python pipeline files to automatically detect operator execution order
  • Migrates pipeline storage from in-memory dictionary to YAML file persistence
  • Replaces Pydantic schema-based data handling with plain dictionaries in the registry service layer
  • Adds parameter transformation logic to convert frontend list format to dictionary format

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 15 comments.

Show a summary per file
File Description
backend/app/services/pipeline_registry.py Complete rewrite adding YAML persistence, AST-based operator extraction, and parameter parsing utilities
backend/app/services/operator_registry.py Adds get_op_details() method to fetch operator metadata for enriching pipeline configurations
backend/app/schemas/pipelines.py Adds file_path field to config, changes params type to Any, comments out validators, adds unused imports
backend/app/core/config.py Changes pipeline registry file format from JSON to YAML
backend/app/api/v1/endpoints/pipelines.py Adds parameter transformation logic to convert list-based params to dict format before processing
Comments suppressed due to low confidence (5)

backend/app/api/v1/endpoints/pipelines.py:102

  • Inconsistent parameter transformation logic. The parse_frontend_params is called in three different places (lines 40, 65, and 97) to transform params from list to dict format. However, this transformation is only applied when creating/updating pipelines or executing with custom config, but not when executing a predefined pipeline by ID (line 100-102 path). This inconsistency could cause issues if a predefined pipeline is executed and its operators have params in the old list format. Consider ensuring consistent params handling across all execution paths.
        if payload.config:
            for op in payload.config.operators:
                op.params = _PIPELINE_REGISTRY.parse_frontend_params(op.params)

        # 调用服务层开始执行
        execution_id, pipeline_config, initial_result = _PIPELINE_REGISTRY.start_execution(
            pipeline_id=payload.pipeline_id, 
            config=payload.config

backend/app/schemas/pipelines.py:47

  • This comment appears to contain commented-out code.
    # def validate_operator_name(cls, v: str) -> str:
    #     """验证算子名称格式"""
    #     if not v.replace('_', '').isalnum():
    #         raise ValueError('Operator name can only contain letters, numbers and underscores')
    #         # 后续可以补充从可用算子集中验证算子名称是否存在
    #     return v

backend/app/schemas/pipelines.py:61

  • This comment appears to contain commented-out code.
    # def validate_operators(cls, v: List[PipelineOperator]) -> List[PipelineOperator]:
    #     """确保至少有一个算子"""
    #     if not v:
    #         raise ValueError('Pipeline must have at least one operator')
    #     return v

backend/app/schemas/pipelines.py:4

  • Import of 'OperatorDetailSchema' is not used.
from app.schemas.operator import OperatorDetailSchema

backend/app/schemas/pipelines.py:5

  • Import of 'FileStorage' is not used.
from dataflow.utils.storage import FileStorage

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Enrich pipeline operators with detailed parameter info from registry.
"""
# Deep copy to avoid modifying original data
pipeline = json.loads(json.dumps(pipeline_data))
Copy link

Copilot AI Dec 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inefficient deep copy using JSON serialization. Line 369 uses json.loads(json.dumps(pipeline_data)) for deep copying, which is inefficient and has limitations (e.g., can't handle datetime objects, custom classes). Python's copy.deepcopy() from the standard library is more appropriate and handles more data types. Consider replacing with: import copy; pipeline = copy.deepcopy(pipeline_data)

Copilot uses AI. Check for mistakes.
确保注册表文件存在,并加载api_pipelines目录中的所有py文件
"""
if not os.path.exists(self.path):
os.makedirs(os.path.dirname(self.path), exist_ok=True)
Copy link

Copilot AI Dec 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential bug with directory creation. When the registry file doesn't exist, os.path.dirname(self.path) could return an empty string if self.path is just a filename without directory separators. This would cause os.makedirs to either fail or create nothing. Consider adding a check or using an absolute path.

Suggested change
os.makedirs(os.path.dirname(self.path), exist_ok=True)
dir_name = os.path.dirname(self.path)
if dir_name:
os.makedirs(dir_name, exist_ok=True)

Copilot uses AI. Check for mistakes.
# 后续可以补充从可用算子集中验证算子名称是否存在
return v

params: Any = Field(default_factory=dict, description="算子参数配置")
Copy link

Copilot AI Dec 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Type annotation uses ambiguous Any type for params field. The params field type was changed from Dict[str, Any] to Any on line 39. This removes type safety and makes it unclear what data structure is expected (could be dict, list, or anything else). This inconsistency is evident in the code where params is sometimes treated as a dict (line 378) and sometimes as a list (lines 40, 65, 97). Consider using a Union type to explicitly indicate the accepted types: params: Union[Dict[str, Any], List[Dict[str, Any]]] = Field(default_factory=dict, ...)

Suggested change
params: Any = Field(default_factory=dict, description="算子参数配置")
params: Union[Dict[str, Any], List[Dict[str, Any]]] = Field(default_factory=dict, description="算子参数配置")

Copilot uses AI. Check for mistakes.
Comment on lines +123 to +131
def _read(self) -> Dict:
"""读取注册表文件"""
with open(self.path, "r", encoding="utf-8") as f:
return yaml.safe_load(f) or {"pipelines": {}, "executions": {}}

def _write(self, data: Dict):
"""写入注册表文件"""
with open(self.path, "w", encoding="utf-8") as f:
yaml.safe_dump(data, f, allow_unicode=True, sort_keys=False)
Copy link

Copilot AI Dec 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Concurrency issue with file I/O. Multiple methods (_read, _write, start_execution, execute_pipeline_task, etc.) read and write to the same YAML file without any locking mechanism. In a multi-threaded environment (which FastAPI uses), concurrent requests could lead to race conditions where:

  1. Two requests read the file simultaneously
  2. Both modify their in-memory copies
  3. Both write back, with the second write overwriting the first's changes

Consider implementing file locking (e.g., using fcntl on Unix or msvcrt on Windows) or using a proper database for persistence.

Copilot uses AI. Check for mistakes.
Comment on lines +451 to +492
# 首先,找出所有在__init__方法中创建的operator实例
for node in ast.walk(tree):
# 查找类定义
if isinstance(node, ast.ClassDef):
# 查找__init__方法
for method in node.body:
if isinstance(method, ast.FunctionDef) and method.name == '__init__':
# 分析__init__方法中的赋值语句
for stmt in method.body:
if isinstance(stmt, ast.Assign):
for target in stmt.targets:
# 检查是否是self.var = Class()的形式
if isinstance(target, ast.Attribute) and isinstance(target.value, ast.Name) and target.value.id == 'self':
var_name = target.attr
# 检查右侧是否是调用表达式
if isinstance(stmt.value, ast.Call):
# 获取类名
if isinstance(stmt.value.func, ast.Name):
class_name = stmt.value.func.id
var_to_class_map[var_name] = class_name
logger.debug(f"Found operator: {var_name} = {class_name}")
elif isinstance(stmt.value.func, ast.Attribute):
# 处理形如module.Class()的情况
class_name = stmt.value.func.attr
var_to_class_map[var_name] = class_name
logger.debug(f"Found operator: {var_name} = {class_name}")

# 查找forward方法
if isinstance(node, ast.ClassDef):
for method in node.body:
if isinstance(method, ast.FunctionDef) and method.name == 'forward':
# 分析forward方法中的.run()调用
for stmt in ast.walk(method):
if isinstance(stmt, ast.Expr) and isinstance(stmt.value, ast.Call):
# 检查是否是self.var.run()的形式
if isinstance(stmt.value.func, ast.Attribute) and stmt.value.func.attr == 'run':
if isinstance(stmt.value.func.value, ast.Attribute) and isinstance(stmt.value.func.value.value, ast.Name) and stmt.value.func.value.value.id == 'self':
var_name = stmt.value.func.value.attr
if var_name in var_to_class_map:
operator_class_names.append(var_to_class_map[var_name])
logger.debug(f"Found operator execution: {var_name}.run() -> {var_to_class_map[var_name]}")

Copy link

Copilot AI Dec 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inefficient AST traversal pattern. The code uses two separate ast.walk(tree) loops (lines 452 and 479) to find __init__ and forward methods. Both loops iterate through the entire tree, which is inefficient. Consider combining them into a single traversal that processes both methods in one pass, or better yet, iterate through node.body directly when you find a ClassDef instead of using nested ast.walk calls.

Suggested change
# 首先,找出所有在__init__方法中创建的operator实例
for node in ast.walk(tree):
# 查找类定义
if isinstance(node, ast.ClassDef):
# 查找__init__方法
for method in node.body:
if isinstance(method, ast.FunctionDef) and method.name == '__init__':
# 分析__init__方法中的赋值语句
for stmt in method.body:
if isinstance(stmt, ast.Assign):
for target in stmt.targets:
# 检查是否是self.var = Class()的形式
if isinstance(target, ast.Attribute) and isinstance(target.value, ast.Name) and target.value.id == 'self':
var_name = target.attr
# 检查右侧是否是调用表达式
if isinstance(stmt.value, ast.Call):
# 获取类名
if isinstance(stmt.value.func, ast.Name):
class_name = stmt.value.func.id
var_to_class_map[var_name] = class_name
logger.debug(f"Found operator: {var_name} = {class_name}")
elif isinstance(stmt.value.func, ast.Attribute):
# 处理形如module.Class()的情况
class_name = stmt.value.func.attr
var_to_class_map[var_name] = class_name
logger.debug(f"Found operator: {var_name} = {class_name}")
# 查找forward方法
if isinstance(node, ast.ClassDef):
for method in node.body:
if isinstance(method, ast.FunctionDef) and method.name == 'forward':
# 分析forward方法中的.run()调用
for stmt in ast.walk(method):
if isinstance(stmt, ast.Expr) and isinstance(stmt.value, ast.Call):
# 检查是否是self.var.run()的形式
if isinstance(stmt.value.func, ast.Attribute) and stmt.value.func.attr == 'run':
if isinstance(stmt.value.func.value, ast.Attribute) and isinstance(stmt.value.func.value.value, ast.Name) and stmt.value.func.value.value.id == 'self':
var_name = stmt.value.func.value.attr
if var_name in var_to_class_map:
operator_class_names.append(var_to_class_map[var_name])
logger.debug(f"Found operator execution: {var_name}.run() -> {var_to_class_map[var_name]}")
# 遍历AST,查找所有类定义,并在每个类中查找__init__和forward方法
for node in ast.walk(tree):
if isinstance(node, ast.ClassDef):
# 记录本类的变量到类名的映射
class_var_to_class_map = {}
# 先处理__init__和forward方法
for method in node.body:
if isinstance(method, ast.FunctionDef):
if method.name == '__init__':
# 分析__init__方法中的赋值语句
for stmt in method.body:
if isinstance(stmt, ast.Assign):
for target in stmt.targets:
# 检查是否是self.var = Class()的形式
if isinstance(target, ast.Attribute) and isinstance(target.value, ast.Name) and target.value.id == 'self':
var_name = target.attr
# 检查右侧是否是调用表达式
if isinstance(stmt.value, ast.Call):
# 获取类名
if isinstance(stmt.value.func, ast.Name):
class_name = stmt.value.func.id
class_var_to_class_map[var_name] = class_name
logger.debug(f"Found operator: {var_name} = {class_name}")
elif isinstance(stmt.value.func, ast.Attribute):
# 处理形如module.Class()的情况
class_name = stmt.value.func.attr
class_var_to_class_map[var_name] = class_name
logger.debug(f"Found operator: {var_name} = {class_name}")
elif method.name == 'forward':
# 分析forward方法中的.run()调用
for stmt in ast.walk(method):
if isinstance(stmt, ast.Expr) and isinstance(stmt.value, ast.Call):
# 检查是否是self.var.run()的形式
if isinstance(stmt.value.func, ast.Attribute) and stmt.value.func.attr == 'run':
if isinstance(stmt.value.func.value, ast.Attribute) and isinstance(stmt.value.func.value.value, ast.Name) and stmt.value.func.value.value.id == 'self':
var_name = stmt.value.func.value.attr
# 优先使用本类的映射,否则用全局映射
class_name = class_var_to_class_map.get(var_name) or var_to_class_map.get(var_name)
if class_name:
operator_class_names.append(class_name)
logger.debug(f"Found operator execution: {var_name}.run() -> {class_name}")
# 合并本类的映射到全局映射
var_to_class_map.update(class_var_to_class_map)

Copilot uses AI. Check for mistakes.
Comment on lines +95 to +97
if payload.config:
for op in payload.config.operators:
op.params = _PIPELINE_REGISTRY.parse_frontend_params(op.params)
Copy link

Copilot AI Dec 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modifying Pydantic model instance directly is not recommended. On line 97, the code directly modifies op.params which is an attribute of a Pydantic model instance (payload.config.operators). This bypasses Pydantic's validation and can lead to inconsistent state. Consider creating a new model instance or converting to dict first: config_dict = payload.config.model_dump() and then modifying the dict before passing it to the service layer.

Copilot uses AI. Check for mistakes.
Comment on lines +4 to +5
from app.schemas.operator import OperatorDetailSchema
from dataflow.utils.storage import FileStorage
Copy link

Copilot AI Dec 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused imports detected. OperatorDetailSchema and FileStorage are imported but never used in this file. Consider removing them to keep the code clean.

Suggested change
from app.schemas.operator import OperatorDetailSchema
from dataflow.utils.storage import FileStorage

Copilot uses AI. Check for mistakes.
Comment on lines +452 to +492
for node in ast.walk(tree):
# 查找类定义
if isinstance(node, ast.ClassDef):
# 查找__init__方法
for method in node.body:
if isinstance(method, ast.FunctionDef) and method.name == '__init__':
# 分析__init__方法中的赋值语句
for stmt in method.body:
if isinstance(stmt, ast.Assign):
for target in stmt.targets:
# 检查是否是self.var = Class()的形式
if isinstance(target, ast.Attribute) and isinstance(target.value, ast.Name) and target.value.id == 'self':
var_name = target.attr
# 检查右侧是否是调用表达式
if isinstance(stmt.value, ast.Call):
# 获取类名
if isinstance(stmt.value.func, ast.Name):
class_name = stmt.value.func.id
var_to_class_map[var_name] = class_name
logger.debug(f"Found operator: {var_name} = {class_name}")
elif isinstance(stmt.value.func, ast.Attribute):
# 处理形如module.Class()的情况
class_name = stmt.value.func.attr
var_to_class_map[var_name] = class_name
logger.debug(f"Found operator: {var_name} = {class_name}")

# 查找forward方法
if isinstance(node, ast.ClassDef):
for method in node.body:
if isinstance(method, ast.FunctionDef) and method.name == 'forward':
# 分析forward方法中的.run()调用
for stmt in ast.walk(method):
if isinstance(stmt, ast.Expr) and isinstance(stmt.value, ast.Call):
# 检查是否是self.var.run()的形式
if isinstance(stmt.value.func, ast.Attribute) and stmt.value.func.attr == 'run':
if isinstance(stmt.value.func.value, ast.Attribute) and isinstance(stmt.value.func.value.value, ast.Name) and stmt.value.func.value.value.id == 'self':
var_name = stmt.value.func.value.attr
if var_name in var_to_class_map:
operator_class_names.append(var_to_class_map[var_name])
logger.debug(f"Found operator execution: {var_name}.run() -> {var_to_class_map[var_name]}")

Copy link

Copilot AI Dec 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential bug in AST parsing logic. The nested loop structure at line 479-491 will process the same ClassDef multiple times from the outer ast.walk(tree) loop. Additionally, using ast.walk(method) on line 483 to find statements in the forward method is inefficient and may match statements from nested functions. The logic should iterate directly through method.body instead of calling ast.walk(method). This could lead to incorrect or duplicate operator detection.

Suggested change
for node in ast.walk(tree):
# 查找类定义
if isinstance(node, ast.ClassDef):
# 查找__init__方法
for method in node.body:
if isinstance(method, ast.FunctionDef) and method.name == '__init__':
# 分析__init__方法中的赋值语句
for stmt in method.body:
if isinstance(stmt, ast.Assign):
for target in stmt.targets:
# 检查是否是self.var = Class()的形式
if isinstance(target, ast.Attribute) and isinstance(target.value, ast.Name) and target.value.id == 'self':
var_name = target.attr
# 检查右侧是否是调用表达式
if isinstance(stmt.value, ast.Call):
# 获取类名
if isinstance(stmt.value.func, ast.Name):
class_name = stmt.value.func.id
var_to_class_map[var_name] = class_name
logger.debug(f"Found operator: {var_name} = {class_name}")
elif isinstance(stmt.value.func, ast.Attribute):
# 处理形如module.Class()的情况
class_name = stmt.value.func.attr
var_to_class_map[var_name] = class_name
logger.debug(f"Found operator: {var_name} = {class_name}")
# 查找forward方法
if isinstance(node, ast.ClassDef):
for method in node.body:
if isinstance(method, ast.FunctionDef) and method.name == 'forward':
# 分析forward方法中的.run()调用
for stmt in ast.walk(method):
if isinstance(stmt, ast.Expr) and isinstance(stmt.value, ast.Call):
# 检查是否是self.var.run()的形式
if isinstance(stmt.value.func, ast.Attribute) and stmt.value.func.attr == 'run':
if isinstance(stmt.value.func.value, ast.Attribute) and isinstance(stmt.value.func.value.value, ast.Name) and stmt.value.func.value.value.id == 'self':
var_name = stmt.value.func.value.attr
if var_name in var_to_class_map:
operator_class_names.append(var_to_class_map[var_name])
logger.debug(f"Found operator execution: {var_name}.run() -> {var_to_class_map[var_name]}")
# 只处理顶层的类定义,避免重复和嵌套处理
for node in [n for n in ast.iter_child_nodes(tree) if isinstance(n, ast.ClassDef)]:
# 查找__init__方法
for method in node.body:
if isinstance(method, ast.FunctionDef) and method.name == '__init__':
# 分析__init__方法中的赋值语句
for stmt in method.body:
if isinstance(stmt, ast.Assign):
for target in stmt.targets:
# 检查是否是self.var = Class()的形式
if isinstance(target, ast.Attribute) and isinstance(target.value, ast.Name) and target.value.id == 'self':
var_name = target.attr
# 检查右侧是否是调用表达式
if isinstance(stmt.value, ast.Call):
# 获取类名
if isinstance(stmt.value.func, ast.Name):
class_name = stmt.value.func.id
var_to_class_map[var_name] = class_name
logger.debug(f"Found operator: {var_name} = {class_name}")
elif isinstance(stmt.value.func, ast.Attribute):
# 处理形如module.Class()的情况
class_name = stmt.value.func.attr
var_to_class_map[var_name] = class_name
logger.debug(f"Found operator: {var_name} = {class_name}")
# 查找forward方法
for method in node.body:
if isinstance(method, ast.FunctionDef) and method.name == 'forward':
# 只遍历forward方法的顶层语句,避免嵌套函数中的.run()调用
for stmt in method.body:
if isinstance(stmt, ast.Expr) and isinstance(stmt.value, ast.Call):
# 检查是否是self.var.run()的形式
if isinstance(stmt.value.func, ast.Attribute) and stmt.value.func.attr == 'run':
if isinstance(stmt.value.func.value, ast.Attribute) and isinstance(stmt.value.func.value.value, ast.Name) and stmt.value.func.value.value.id == 'self':
var_name = stmt.value.func.value.attr
if var_name in var_to_class_map:
operator_class_names.append(var_to_class_map[var_name])
logger.debug(f"Found operator execution: {var_name}.run() -> {var_to_class_map[var_name]}")

Copilot uses AI. Check for mistakes.
Comment on lines +313 to +315
data = self._read()
data["executions"][execution_id] = execution_result
self._write(data)
Copy link

Copilot AI Dec 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Memory leak potential with unbounded executions storage. The executions dictionary in the YAML file will grow indefinitely as new pipeline executions are added (lines 314, 346). There's no mechanism to clean up old execution results. Over time, this could lead to:

  1. Large YAML files that are slow to read/write
  2. Memory issues when loading the entire file
  3. Performance degradation

Consider implementing a cleanup mechanism (e.g., TTL-based deletion, max size limit) or using a proper database for execution history.

Copilot uses AI. Check for mistakes.
Comment on lines +91 to +95
# def validate_at_least_one(cls, v, info):
# """确保至少提供pipeline_id或config之一"""
# if info.data.get('pipeline_id') is None and info.data.get('config') is None:
# raise ValueError('Either pipeline_id or config must be provided')
# return v
Copy link

Copilot AI Dec 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment appears to contain commented-out code.

Copilot uses AI. Check for mistakes.
@MOLYHECI
Copy link
Copy Markdown
Collaborator

MOLYHECI commented Dec 7, 2025

LGTM. Thank you very much!

@MOLYHECI MOLYHECI merged commit d608530 into OpenDCAI:backend Dec 7, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants