diff --git a/backend/app/api/v1/endpoints/tasks.py b/backend/app/api/v1/endpoints/tasks.py index 8063761..2d3997d 100644 --- a/backend/app/api/v1/endpoints/tasks.py +++ b/backend/app/api/v1/endpoints/tasks.py @@ -84,6 +84,37 @@ def get_task_result(task_id: str, step: int = None, limit: int = 5): raise HTTPException(500, f"Failed to get task result: {str(e)}") +@router.get("/execution/{task_id}/log", response_model=ApiResponse[List[str]], operation_id="get_execution_log", summary="获取任务日志") +def get_execution_log(task_id: str, operator_name: str = Query(None, description="算子名称")): + """ + 获取任务日志 + + Args: + task_id: 任务 ID + operator_name: 算子名称(可选,指定时返回该算子的日志) + + Returns: + 日志列表 + """ + try: + logger.info(f"Request: GET /execution/{task_id}/log, operator_name={operator_name}") + + # 检查任务是否存在 + task = container.task_registry.get(task_id) + if not task: + raise HTTPException(404, f"Task with id {task_id} not found") + + logs = container.task_registry.get_execution_logs(task_id, operator_name) + + return ok(logs) + + except HTTPException: + raise + except Exception as e: + logger.error(f"Failed to get task logs: {e}") + raise HTTPException(500, f"Failed to get task logs: {str(e)}") + + @router.get("/execution/{task_id}/download", operation_id="download_task_result", summary="下载任务执行结果文件") def download_task_result(task_id: str, step: int = None): """ diff --git a/backend/app/services/dataflow_engine.py b/backend/app/services/dataflow_engine.py index eaf6ee6..9e45195 100644 --- a/backend/app/services/dataflow_engine.py +++ b/backend/app/services/dataflow_engine.py @@ -14,6 +14,9 @@ from datetime import datetime import ray import asyncio +import io +import sys +from contextlib import redirect_stdout, redirect_stderr logger = get_logger(__name__) @@ -334,17 +337,10 @@ def run(self, pipeline_config: Dict[str, Any], task_id: str, execution_path: Opt output: Dict[str, Any] = {} # ✅ 新增:按算子分组的日志 # ✅ 新增:stage -> operator -> logs - stage_operator_logs: Dict[str, Dict[str, List[str]]] = { - "global": {"__pipeline__": []}, - "init": {}, - "run": {}, - } + operator_logs: Dict[str, List[str]] = {} # keyed by op_name_index - # ✅ 新增:算子粒度状态追踪 - operator_progress: Dict[str, Dict[str, Any]] = { - "init": {}, - "run": {} - } + # ✅ 新增:算子粒度运行结果详情 + operators_detail: Dict[str, Dict[str, Any]] = {} # ✅ 新增:实时更新执行状态到文件 def update_execution_status(status: str = None, partial_output: Dict[str, Any] = None): @@ -359,11 +355,11 @@ def update_execution_status(status: str = None, partial_output: Dict[str, Any] = if status: data["tasks"][task_id]["status"] = status if partial_output: + if "output" not in data["tasks"][task_id]: + data["tasks"][task_id]["output"] = {} # 更新 output 中的内容 data["tasks"][task_id]["output"].update(partial_output) - # 同时更新顶层的 operator_progress 字段(用于 get_execution_status 查询) - if "operator_progress" in partial_output: - data["tasks"][task_id]["operator_progress"] = partial_output["operator_progress"] + with open(execution_path, "w", encoding="utf-8") as f: json.dump(data, f, indent=2) except Exception as e: @@ -371,14 +367,18 @@ def update_execution_status(status: str = None, partial_output: Dict[str, Any] = # ✅ 新增:统一写日志(同时写到全局 logs 和该算子的 logs) def add_log(stage: str, message: str, op_name: str = "__pipeline__"): - """同时写入:全局流水 logs + 分stage/分operator日志""" - ts_msg = message # 你也可以在这里统一加时间戳 + """同时写入:全局流水 logs + 分operator日志""" + # ts_msg = f"[{datetime.now().isoformat()}] {message}" # message already has timestamp mostly + ts_msg = message logs.append(ts_msg) - stage_operator_logs.setdefault(stage, {}).setdefault(op_name, []).append(ts_msg) + + # 如果是具体的算子,记录到 operator_logs + if op_name != "__pipeline__": + operator_logs.setdefault(op_name, []).append(ts_msg) add_log("global", f"[{started_at}] Starting pipeline execution: {task_id}") - logs.append(f"[{started_at}] Starting pipeline execution: {task_id}") logger.info(f"Starting pipeline execution: {task_id}") + try: # Step 1: 初始化 Storage @@ -452,8 +452,14 @@ def add_log(stage: str, message: str, op_name: str = "__pipeline__"): for op_idx, op in enumerate(operators): op_name = op.get("name", f"Operator_{op_idx}") - add_log("init", f"[{datetime.now().isoformat()}] [{op_idx+1}/{len(operators)}] Initializing operator: {op_name}", op_name) - logs.append(f"[{datetime.now().isoformat()}] [{op_idx+1}/{len(operators)}] Initializing operator: {op_name}") + op_key = f"{op_name}_{op_idx}" + operators_detail[op_key] = { + "name": op_name, + "index": op_idx, + "status": "initializing" + } + + add_log("init", f"[{datetime.now().isoformat()}] [{op_idx+1}/{len(operators)}] Initializing operator: {op_name}", op_key) logger.info(f"[{op_idx+1}/{len(operators)}] Initializing operator: {op_name}") try: init_params = {} @@ -468,8 +474,7 @@ def add_log(stage: str, message: str, op_name: str = "__pipeline__"): if param_name == "llm_serving": serving_id = param_value logger.info(f"Operator {op_name}: initializing serving {serving_id}") - add_log("init", f"[{datetime.now().isoformat()}] - Initializing LLM serving: {serving_id}", op_name) - logs.append(f"[{datetime.now().isoformat()}] - Initializing LLM serving: {serving_id}") + add_log("init", f"[{datetime.now().isoformat()}] - Initializing LLM serving: {serving_id}", op_key) if serving_id not in serving_instance_map: serving_instance_map[serving_id] = self.init_serving_instance(serving_id) param_value = serving_instance_map[serving_id] @@ -477,8 +482,7 @@ def add_log(stage: str, message: str, op_name: str = "__pipeline__"): elif param_name == "embedding_serving": serving_id = param_value logger.info(f"Operator {op_name}: initializing embedding serving {serving_id}") - add_log("init", f"[{datetime.now().isoformat()}] - Initializing embedding serving: {serving_id}", op_name) - logs.append(f"[{datetime.now().isoformat()}] - Initializing embedding serving: {serving_id}") + add_log("init", f"[{datetime.now().isoformat()}] - Initializing embedding serving: {serving_id}", op_key) if serving_id not in embedding_serving_instance_map: embedding_serving_instance_map[serving_id] = self.init_serving_instance(serving_id, is_embedding=True) param_value = embedding_serving_instance_map[serving_id] @@ -498,8 +502,7 @@ def add_log(stage: str, message: str, op_name: str = "__pipeline__"): elif param_name == "prompt_template": prompt_cls_name = extract_class_name(param_value) - add_log("init", f"[{datetime.now().isoformat()}] - Loading prompt template: {prompt_cls_name}", op_name) - logs.append(f"[{datetime.now().isoformat()}] - Loading prompt template: {prompt_cls_name}") + add_log("init", f"[{datetime.now().isoformat()}] - Loading prompt template: {prompt_cls_name}", op_key) prompt_cls = PROMPT_REGISTRY.get(prompt_cls_name) if not prompt_cls: raise DataFlowEngineError( @@ -541,14 +544,17 @@ def add_log(stage: str, message: str, op_name: str = "__pipeline__"): ) operator_instance = operator_cls(**init_params) - run_op.append((operator_instance, run_params, op_name)) - add_log("init", f"[{datetime.now().isoformat()}] [{op_idx+1}/{len(operators)}] {op_name} initialized successfully", op_name) - logs.append(f"[{datetime.now().isoformat()}] [{op_idx+1}/{len(operators)}] {op_name} initialized successfully") + run_op.append((operator_instance, run_params, op_name, op_key)) + + operators_detail[op_key]["status"] = "initialized" + add_log("init", f"[{datetime.now().isoformat()}] [{op_idx+1}/{len(operators)}] {op_name} initialized successfully", op_key) logger.info(f"Operator {op_name} initialized successfully") except DataFlowEngineError: + operators_detail[op_key]["status"] = "failed" raise except Exception as e: + operators_detail[op_key]["status"] = "failed" raise DataFlowEngineError( f"初始化Operator失败: {op_name}", context={ @@ -561,38 +567,88 @@ def add_log(stage: str, message: str, op_name: str = "__pipeline__"): # Step 3: 执行所有 Operators add_log("run", f"[{datetime.now().isoformat()}] Step 3: Executing {len(run_op)} operators...") - logs.append(f"[{datetime.now().isoformat()}] Step 3: Executing {len(run_op)} operators...") logger.info(f"Executing {len(run_op)} operators...") execution_results = [] - for op_idx, (operator, run_params, op_name) in enumerate(run_op): + for op_idx, (operator, run_params, op_name, op_key) in enumerate(run_op): try: run_params["storage"] = storage.step() - add_log("run", f"[{datetime.now().isoformat()}] [{op_idx+1}/{len(run_op)}] Running operator: {op_name}", op_name) - logs.append(f"[{datetime.now().isoformat()}] [{op_idx+1}/{len(run_op)}] Running operator: {op_name}") + add_log("run", f"[{datetime.now().isoformat()}] [{op_idx+1}/{len(run_op)}] Running operator: {op_name}", op_key) logger.info(f"[{op_idx+1}/{len(run_op)}] Running {op_name}") logger.debug(f"Run params: {list(run_params.keys())}") # ✅ 更新算子粒度状态:开始执行 - op_key = f"{op_name}_{op_idx}" - operator_progress["run"].setdefault(op_key, []).append(f"[{datetime.now().isoformat()}] Started") - # ✅ 记录当前正在执行的 step - operator_progress["current_step"] = op_idx + operators_detail[op_key]["status"] = "running" + operators_detail[op_key]["started_at"] = datetime.now().isoformat() + # ✅ 实时更新状态到文件 - update_execution_status("running", {"operator_progress": operator_progress}) + update_execution_status("running", { + "operators_detail": operators_detail, + "operator_logs": operator_logs # Sync logs to file + }) api_pipeline_path = os.path.join(settings.DATAFLOW_CORE_DIR, "api_pipelines") - print(api_pipeline_path) os.chdir(api_pipeline_path) - operator.run(**run_params) + + # ✅ 捕获 stdout/stderr + f_stdout = io.StringIO() + f_stderr = io.StringIO() + + try: + with redirect_stdout(f_stdout), redirect_stderr(f_stderr): + operator.run(**run_params) + finally: + stdout_str = f_stdout.getvalue() + stderr_str = f_stderr.getvalue() + + if stdout_str: + for line in stdout_str.splitlines(): + if line.strip(): + add_log("run", f"[STDOUT] {line}", op_key) + if stderr_str: + for line in stderr_str.splitlines(): + if line.strip(): + add_log("run", f"[STDERR] {line}", op_key) + os.chdir(settings.BASE_DIR) - add_log("run", f"[{datetime.now().isoformat()}] [{op_idx+1}/{len(run_op)}] {op_name} completed successfully", op_name) - logs.append(f"[{datetime.now().isoformat()}] [{op_idx+1}/{len(run_op)}] {op_name} completed successfully") + # ✅ 获取处理后的数据量 + # 尝试从 output file 获取 + sample_count = 0 + storage_obj = run_params.get("storage") + + if storage_obj: + f_path = None + # 尝试使用 _get_cache_file_path 推断输出文件路径 + if hasattr(storage_obj, "_get_cache_file_path") and hasattr(storage_obj, "operator_step"): + try: + # operator_step 是输入 step,输出在 step + 1 + f_path = storage_obj._get_cache_file_path(storage_obj.operator_step + 1) + except Exception: + pass + + if f_path and os.path.exists(f_path): + try: + with open(f_path, 'r', encoding='utf-8') as f: + sample_count = sum(1 for _ in f) + except Exception as e: + logger.warning(f"Failed to count lines in {f_path}: {e}") + add_log("run", f"WARN: Failed to read output file: {e}", op_key) + + operators_detail[op_key]["sample_count"] = sample_count + add_log("run", f"Processed {sample_count} samples", op_key) + + add_log("run", f"[{datetime.now().isoformat()}] [{op_idx+1}/{len(run_op)}] {op_name} completed successfully", op_key) logger.info(f"[{op_idx+1}/{len(run_op)}] {op_name} completed") # ✅ 更新算子粒度状态:执行完成 - operator_progress["run"].setdefault(op_key, []).append(f"[{datetime.now().isoformat()}] Completed") + operators_detail[op_key]["status"] = "completed" + operators_detail[op_key]["completed_at"] = datetime.now().isoformat() + update_execution_status("running", { + "operators_detail": operators_detail, + "operator_logs": operator_logs + }) + # ✅ 记录缓存文件信息 from app.core.config import settings @@ -607,8 +663,6 @@ def add_log(stage: str, message: str, op_name: str = "__pipeline__"): except Exception as e: logger.error(f"[Pipeline] Failed to read cache file: {e}") - # ✅ 实时更新状态到文件 - update_execution_status("running", {"operator_progress": operator_progress}) execution_results.append({ "operator": op_name, @@ -617,6 +671,12 @@ def add_log(stage: str, message: str, op_name: str = "__pipeline__"): }) except Exception as e: + operators_detail[op_key]["status"] = "failed" + operators_detail[op_key]["error"] = str(e) + update_execution_status("failed", { + "operators_detail": operators_detail, + "operator_logs": operator_logs + }) raise DataFlowEngineError( f"执行Operator失败: {op_name}", context={ @@ -630,13 +690,12 @@ def add_log(stage: str, message: str, op_name: str = "__pipeline__"): # os.chdir(settings.BASE_DIR) # 成功完成 completed_at = datetime.now().isoformat() - add_log("run", f"[{completed_at}] Pipeline execution completed successfully") - logs.append(f"[{completed_at}] Pipeline execution completed successfully") + add_log("global", f"[{completed_at}] Pipeline execution completed successfully") logger.info(f"Pipeline execution completed successfully: {task_id}") output["operators_executed"] = len(run_op) - output["stage_operator_logs"] = stage_operator_logs - output["operator_progress"] = operator_progress + output["operators_detail"] = operators_detail + output["operator_logs"] = operator_logs output["execution_results"] = execution_results output["success"] = True @@ -645,6 +704,7 @@ def add_log(stage: str, message: str, op_name: str = "__pipeline__"): "status": "completed", "output": output, "logs": logs, + "operator_logs": operator_logs, # Return structured logs "started_at": started_at, "completed_at": completed_at } @@ -653,23 +713,31 @@ def add_log(stage: str, message: str, op_name: str = "__pipeline__"): completed_at = datetime.now().isoformat() error_log = f"[{completed_at}] ERROR: {e.message}" error_op_name = e.context.get("operator") + # Try to find the op_key if possible, or just log generally + if error_op_name: - add_log("run", f"[{completed_at}] ERROR: {e.message}", error_op_name) - # ✅ 更新算子粒度状态:执行失败 - operator_progress["run"].setdefault(error_op_name, []).append(f"[{completed_at}] Failed: {e.message}") + # Find matching key in operators_detail or iterate + target_key = None + for k, v in operators_detail.items(): + if v["name"] == error_op_name: + target_key = k + break + + if target_key: + add_log("run", f"[{completed_at}] ERROR: {e.message}", target_key) + operators_detail[target_key]["status"] = "failed" + operators_detail[target_key]["error"] = e.message + logs.append(error_log) logger.error(f"Pipeline execution failed: {e.message}") logger.error(f"Context: {e.context}") - if e.original_error: - add_log("run", f"[{completed_at}] ERROR: {e.message}", error_op_name) # 返回失败结果 output["error"] = e.message output["error_context"] = e.context output["original_error"] = str(e.original_error) if e.original_error else None - output["stage_operator_logs"] = stage_operator_logs - output["operator_progress"] = operator_progress + output["operators_detail"] = operators_detail return { @@ -677,6 +745,7 @@ def add_log(stage: str, message: str, op_name: str = "__pipeline__"): "status": "failed", "output": output, "logs": logs, + "operator_logs": operator_logs, "started_at": started_at, "completed_at": completed_at } @@ -692,7 +761,7 @@ def add_log(stage: str, message: str, op_name: str = "__pipeline__"): # 返回失败结果 output["error"] = "Pipeline执行过程中发生未预期的错误" output["error_message"] = str(e) - output["stage_operator_logs"] = stage_operator_logs + output["operators_detail"] = operators_detail return { "task_id": task_id, diff --git a/backend/app/services/ray_pipeline_executor.py b/backend/app/services/ray_pipeline_executor.py index ed17cb8..8525977 100644 --- a/backend/app/services/ray_pipeline_executor.py +++ b/backend/app/services/ray_pipeline_executor.py @@ -12,6 +12,9 @@ import os from datetime import datetime import traceback +import io +import sys +from contextlib import redirect_stdout, redirect_stderr logger = get_logger(__name__) @@ -71,17 +74,10 @@ def dataflow_pipeline_execute(pipeline_config: Dict[str, Any], dataflow_runtime: global settings # ✅ 新增:按算子分组的日志 # ✅ 新增:stage -> operator -> logs - stage_operator_logs: Dict[str, Dict[str, List[str]]] = { - "global": {"__pipeline__": []}, - "init": {}, - "run": {}, - } + operator_logs: Dict[str, List[str]] = {} # keyed by op_name_index - # ✅ 新增:算子粒度状态追踪 - operator_progress: Dict[str, Dict[str, Any]] = { - "init": {}, - "run": {} - } + # ✅ 新增:算子粒度运行结果详情 + operators_detail: Dict[str, Dict[str, Any]] = {} # ✅ 新增:实时更新执行状态到文件 def update_execution_status(status: str = None, partial_output: Dict[str, Any] = None): @@ -96,11 +92,11 @@ def update_execution_status(status: str = None, partial_output: Dict[str, Any] = if status: data["tasks"][task_id]["status"] = status if partial_output: + if "output" not in data["tasks"][task_id]: + data["tasks"][task_id]["output"] = {} # 更新 output 中的内容 data["tasks"][task_id]["output"].update(partial_output) - # 同时更新顶层的 operator_progress 字段(用于 get_execution_status 查询) - if "operator_progress" in partial_output: - data["tasks"][task_id]["operator_progress"] = partial_output["operator_progress"] + with open(execution_path, "w", encoding="utf-8") as f: json.dump(data, f, indent=2) except Exception as e: @@ -108,10 +104,13 @@ def update_execution_status(status: str = None, partial_output: Dict[str, Any] = # ✅ 新增:统一写日志(同时写到全局 logs 和该算子的 logs) def add_log(stage: str, message: str, op_name: str = "__pipeline__"): - """同时写入:全局流水 logs + 分stage/分operator日志""" - ts_msg = message # 你也可以在这里统一加时间戳 + """同时写入:全局流水 logs + 分operator日志""" + ts_msg = message logs.append(ts_msg) - stage_operator_logs.setdefault(stage, {}).setdefault(op_name, []).append(ts_msg) + + # 如果是具体的算子,记录到 operator_logs + if op_name != "__pipeline__": + operator_logs.setdefault(op_name, []).append(ts_msg) logger.success(f"Input dataflow runtime: {dataflow_runtime}") @@ -155,8 +154,13 @@ def add_log(stage: str, message: str, op_name: str = "__pipeline__"): OPERATOR_REGISTRY._get_all() for op_idx, op in enumerate(operators): op_name = op.get("name", f"Operator_{op_idx}") - add_log("init", f"[{datetime.now().isoformat()}] [{op_idx+1}/{len(operators)}] Initializing operator: {op_name}", op_name) - logs.append(f"[{datetime.now().isoformat()}] [{op_idx+1}/{len(operators)}] Initializing operator: {op_name}") + op_key = f"{op_name}_{op_idx}" + operators_detail[op_key] = { + "name": op_name, + "index": op_idx, + "status": "initializing" + } + add_log("init", f"[{datetime.now().isoformat()}] [{op_idx+1}/{len(operators)}] Initializing operator: {op_name}", op_key) logger.info(f"[{op_idx+1}/{len(operators)}] Initializing operator: {op_name}") try: init_params = {} @@ -171,8 +175,7 @@ def add_log(stage: str, message: str, op_name: str = "__pipeline__"): if param_name == "llm_serving": serving_id = param_value logger.info(f"Operator {op_name}: initializing serving {serving_id}") - add_log("init", f"[{datetime.now().isoformat()}] - Initializing LLM serving: {serving_id}", op_name) - logs.append(f"[{datetime.now().isoformat()}] - Initializing LLM serving: {serving_id}") + add_log("init", f"[{datetime.now().isoformat()}] - Initializing LLM serving: {serving_id}", op_key) if serving_id not in serving_instance_map: serving_info = dataflow_runtime['serving_map'][serving_id] params_dict = {} @@ -284,8 +287,7 @@ def add_log(stage: str, message: str, op_name: str = "__pipeline__"): elif param_name == "prompt_template": prompt_cls_name = extract_class_name(param_value) - add_log("init", f"[{datetime.now().isoformat()}] - Loading prompt template: {prompt_cls_name}", op_name) - logs.append(f"[{datetime.now().isoformat()}] - Loading prompt template: {prompt_cls_name}") + add_log("init", f"[{datetime.now().isoformat()}] - Loading prompt template: {prompt_cls_name}", op_key) prompt_cls = PROMPT_REGISTRY.get(prompt_cls_name) if not prompt_cls: raise DataFlowEngineError( @@ -327,14 +329,17 @@ def add_log(stage: str, message: str, op_name: str = "__pipeline__"): ) operator_instance = operator_cls(**init_params) - run_op.append((operator_instance, run_params, op_name)) - add_log("init", f"[{datetime.now().isoformat()}] [{op_idx+1}/{len(operators)}] {op_name} initialized successfully", op_name) - logs.append(f"[{datetime.now().isoformat()}] [{op_idx+1}/{len(operators)}] {op_name} initialized successfully") + run_op.append((operator_instance, run_params, op_name, op_key)) + + operators_detail[op_key]["status"] = "initialized" + add_log("init", f"[{datetime.now().isoformat()}] [{op_idx+1}/{len(operators)}] {op_name} initialized successfully", op_key) logger.info(f"Operator {op_name} initialized successfully") except DataFlowEngineError: + operators_detail[op_key]["status"] = "failed" raise except Exception as e: + operators_detail[op_key]["status"] = "failed" raise DataFlowEngineError( f"初始化Operator失败: {op_name}", context={ @@ -345,38 +350,84 @@ def add_log(stage: str, message: str, op_name: str = "__pipeline__"): original_error=e ) add_log("run", f"[{datetime.now().isoformat()}] Step 3: Executing {len(run_op)} operators...") - logs.append(f"[{datetime.now().isoformat()}] Step 3: Executing {len(run_op)} operators...") logger.info(f"Executing {len(run_op)} operators...") execution_results = [] - for op_idx, (operator, run_params, op_name) in enumerate(run_op): + for op_idx, (operator, run_params, op_name, op_key) in enumerate(run_op): try: run_params["storage"] = storage.step() - add_log("run", f"[{datetime.now().isoformat()}] [{op_idx+1}/{len(run_op)}] Running operator: {op_name}", op_name) - logs.append(f"[{datetime.now().isoformat()}] [{op_idx+1}/{len(run_op)}] Running operator: {op_name}") + add_log("run", f"[{datetime.now().isoformat()}] [{op_idx+1}/{len(run_op)}] Running operator: {op_name}", op_key) logger.info(f"[{op_idx+1}/{len(run_op)}] Running {op_name}") logger.debug(f"Run params: {list(run_params.keys())}") - # ✅ 更新算子粒度状态:开始执行 - op_key = f"{op_name}_{op_idx}" - operator_progress["run"].setdefault(op_key, []).append(f"[{datetime.now().isoformat()}] Started") - # ✅ 记录当前正在执行的 step - operator_progress["current_step"] = op_idx + # ✅ 更新算子粒度状态:开始执行 + operators_detail[op_key]["status"] = "running" + operators_detail[op_key]["started_at"] = datetime.now().isoformat() + # ✅ 实时更新状态到文件 - update_execution_status("running", {"operator_progress": operator_progress}) + update_execution_status("running", { + "operators_detail": operators_detail, + "operator_logs": operator_logs + }) api_pipeline_path = os.path.join(settings.DATAFLOW_CORE_DIR, "api_pipelines") print(api_pipeline_path) os.chdir(api_pipeline_path) - operator.run(**run_params) + + # ✅ 捕获 stdout/stderr + f_stdout = io.StringIO() + f_stderr = io.StringIO() + + try: + with redirect_stdout(f_stdout), redirect_stderr(f_stderr): + operator.run(**run_params) + finally: + stdout_str = f_stdout.getvalue() + stderr_str = f_stderr.getvalue() + + if stdout_str: + for line in stdout_str.splitlines(): + if line.strip(): + add_log("run", f"[STDOUT] {line}", op_key) + if stderr_str: + for line in stderr_str.splitlines(): + if line.strip(): + add_log("run", f"[STDERR] {line}", op_key) + os.chdir(settings.BASE_DIR) + + # ✅ 获取处理后的数据量 + # 尝试从 output file 获取 + sample_count = 0 + storage_obj = run_params.get("storage") + + if storage_obj: + f_path = None + # 尝试使用 _get_cache_file_path 推断输出文件路径 + if hasattr(storage_obj, "_get_cache_file_path") and hasattr(storage_obj, "operator_step"): + try: + # operator_step 是输入 step,输出在 step + 1 + f_path = storage_obj._get_cache_file_path(storage_obj.operator_step + 1) + except Exception: + pass + + if f_path and os.path.exists(f_path): + try: + with open(f_path, 'r', encoding='utf-8') as f: + sample_count = sum(1 for _ in f) + except Exception as e: + logger.warning(f"Failed to count lines in {f_path}: {e}") + add_log("run", f"WARN: Failed to read output file: {e}", op_key) - add_log("run", f"[{datetime.now().isoformat()}] [{op_idx+1}/{len(run_op)}] {op_name} completed successfully", op_name) - logs.append(f"[{datetime.now().isoformat()}] [{op_idx+1}/{len(run_op)}] {op_name} completed successfully") + operators_detail[op_key]["sample_count"] = sample_count + add_log("run", f"Processed {sample_count} samples", op_key) + + add_log("run", f"[{datetime.now().isoformat()}] [{op_idx+1}/{len(run_op)}] {op_name} completed successfully", op_key) logger.info(f"[{op_idx+1}/{len(run_op)}] {op_name} completed") # ✅ 更新算子粒度状态:执行完成 - operator_progress["run"].setdefault(op_key, []).append(f"[{datetime.now().isoformat()}] Completed") + operators_detail[op_key]["status"] = "completed" + operators_detail[op_key]["completed_at"] = datetime.now().isoformat() # ✅ 记录缓存文件信息 from app.core.config import settings @@ -392,7 +443,10 @@ def add_log(stage: str, message: str, op_name: str = "__pipeline__"): logger.error(f"[Pipeline] Failed to read cache file: {e}") # ✅ 实时更新状态到文件 - update_execution_status("running", {"operator_progress": operator_progress}) + update_execution_status("running", { + "operators_detail": operators_detail, + "operator_logs": operator_logs + }) execution_results.append({ "operator": op_name, @@ -401,6 +455,13 @@ def add_log(stage: str, message: str, op_name: str = "__pipeline__"): }) except Exception as e: + operators_detail[op_key]["status"] = "failed" + operators_detail[op_key]["error"] = str(e) + update_execution_status("failed", { + "operators_detail": operators_detail, + "operator_logs": operator_logs + }) + raise DataFlowEngineError( f"执行Operator失败: {op_name}", context={ @@ -414,13 +475,12 @@ def add_log(stage: str, message: str, op_name: str = "__pipeline__"): # os.chdir(settings.BASE_DIR) # 成功完成 completed_at = datetime.now().isoformat() - add_log("run", f"[{completed_at}] Pipeline execution completed successfully") - logs.append(f"[{completed_at}] Pipeline execution completed successfully") + add_log("global", f"[{completed_at}] Pipeline execution completed successfully") logger.info(f"Pipeline execution completed successfully: {task_id}") output["operators_executed"] = len(run_op) - output["stage_operator_logs"] = stage_operator_logs - output["operator_progress"] = operator_progress + output["operators_detail"] = operators_detail + output["operator_logs"] = operator_logs output["execution_results"] = execution_results output["success"] = True @@ -429,6 +489,7 @@ def add_log(stage: str, message: str, op_name: str = "__pipeline__"): "status": "completed", "output": output, "logs": logs, + "operator_logs": operator_logs, # Return structured logs "started_at": started_at, "completed_at": completed_at } @@ -438,22 +499,28 @@ def add_log(stage: str, message: str, op_name: str = "__pipeline__"): error_log = f"[{completed_at}] ERROR: {e.message}" error_op_name = e.context.get("operator") if error_op_name: - add_log("run", f"[{completed_at}] ERROR: {e.message}", error_op_name) - # ✅ 更新算子粒度状态:执行失败 - operator_progress["run"].setdefault(error_op_name, []).append(f"[{completed_at}] Failed: {e.message}") + # Find matching key in operators_detail or iterate + target_key = None + for k, v in operators_detail.items(): + if v["name"] == error_op_name: + target_key = k + break + + if target_key: + add_log("run", f"[{completed_at}] ERROR: {e.message}", target_key) + operators_detail[target_key]["status"] = "failed" + operators_detail[target_key]["error"] = e.message + logs.append(error_log) logger.error(f"Pipeline execution failed: {e.message}") logger.error(f"Context: {e.context}") - if e.original_error: - add_log("run", f"[{completed_at}] ERROR: {e.message}", error_op_name) # 返回失败结果 output["error"] = e.message output["error_context"] = e.context output["original_error"] = str(e.original_error) if e.original_error else None - output["stage_operator_logs"] = stage_operator_logs - output["operator_progress"] = operator_progress + output["operators_detail"] = operators_detail return { @@ -461,6 +528,7 @@ def add_log(stage: str, message: str, op_name: str = "__pipeline__"): "status": "failed", "output": output, "logs": logs, + "operator_logs": operator_logs, "started_at": started_at, "completed_at": completed_at } @@ -476,7 +544,7 @@ def add_log(stage: str, message: str, op_name: str = "__pipeline__"): # 返回失败结果 output["error"] = "Pipeline执行过程中发生未预期的错误" output["error_message"] = str(e) - output["stage_operator_logs"] = stage_operator_logs + output["operators_detail"] = operators_detail return { "task_id": task_id, diff --git a/backend/app/services/task_registry.py b/backend/app/services/task_registry.py index 0f5456f..51f04e7 100644 --- a/backend/app/services/task_registry.py +++ b/backend/app/services/task_registry.py @@ -259,18 +259,53 @@ def get_execution_status( if not execution_data: return None + # operator_progress is now deprecated, use operators_detail from output + output = execution_data.get("output", {}) + return { "task_id": task_id, "pipeline_id": execution_data.get("pipeline_id"), "pipeline_config": execution_data.get("pipeline_config"), "status": execution_data.get("status"), - "operator_progress": execution_data.get("operator_progress", {}), + "operators_detail": output.get("operators_detail", {}), + "operator_logs": output.get("operator_logs", {}), "logs": execution_data.get("logs", []), - "output": execution_data.get("output", {}), + # "output": output, # Output removed as requested to avoid duplication "started_at": execution_data.get("started_at"), "completed_at": execution_data.get("completed_at"), } + def get_execution_logs(self, task_id: str, operator_name: Optional[str] = None) -> List[str]: + """获取任务日志,可选过滤指定算子""" + data = self._read() + execution_data = data.get("tasks", {}).get(task_id) + if not execution_data: + return [] + + # 如果是查询全局日志/流水线日志 + if not operator_name: + return execution_data.get("logs", []) + + # Assuming we only care about completed logs or what's available. + output = execution_data.get("output", {}) + operator_logs = output.get("operator_logs", {}) + + # Try to find by operator name + target_logs = [] + + # First check structured logs + if operator_name in operator_logs: + return operator_logs[operator_name] + + # If not indexed by simple name, maybe key is op_key + # Try finding key ending with name or just name + for k, v in operator_logs.items(): + if k == operator_name or k.startswith(f"{operator_name}_"): + return v + + # Default fallback to searching in main logs? + return [] + def get_execution_result( self, task_id: str, @@ -300,7 +335,8 @@ def get_execution_result( # 获取执行结果和算子进度 execution_results = output.get("execution_results", []) - operator_progress = execution_data.get("operator_progress", {}) + operators_detail = output.get("operators_detail", {}) + operator_logs = output.get("operator_logs", {}) # 确定要查询的步骤索引 if step is None: @@ -308,11 +344,16 @@ def get_execution_result( if execution_results: step = execution_results[-1].get("index", 0) else: - # 如果没有完成的步骤,尝试从 operator_progress 获取当前正在运行的步骤 - current_step = operator_progress.get("current_step") - if current_step is not None: - # 使用当前正在运行的 step - step = current_step + # 尝试查找正在运行的步骤 + # Find operator with status 'running' in operators_detail + running_op = None + for op_key, op_info in operators_detail.items(): + if op_info.get("status") == "running": + running_op = op_info + break + + if running_op: + step = running_op.get("index", 0) else: step = 0 @@ -325,7 +366,7 @@ def get_execution_result( total_count = 0 file_exists = False - # 如果当前 step 的文件不存在,尝试读取上一步的文件(运行中时,当前 step 的文件还没写入) + # 如果当前 step 的文件不存在,尝试读取上一步的文件 if not os.path.exists(cache_file) and step > 0: cache_file = os.path.join(cache_path, f"{cache_file_prefix}_step{step-1}.jsonl") @@ -347,31 +388,14 @@ def get_execution_result( operator_name = None operator_status = None - # 首先尝试从 execution_results 获取(已完成的算子) - if step < len(execution_results): - operator_name = execution_results[step].get("operator") - operator_status = execution_results[step].get("status") - else: - # 如果 execution_results 中没有,尝试从 operator_progress 获取(正在运行的算子) - run_progress = operator_progress.get("run", {}) - if run_progress: - # 找到第一个正在运行的 operator(Started 但还没有 Completed) - current_operator_key = None - for op_key, op_logs in run_progress.items(): - if op_logs: - last_log = op_logs[-1] - if "Started" in last_log and "Completed" not in last_log: - current_operator_key = op_key - operator_status = "running" - break - elif "Completed" in last_log: - # 这个已经完成了,继续找下一个 - continue - - # 从 key 中提取 operator 名称(去掉 _idx 后缀) - if current_operator_key: - operator_name = current_operator_key.rsplit('_', 1)[0] - + # 从 operators_detail 中查找 + # Need to find entry with index == step + for op_key, op_val in operators_detail.items(): + if op_val.get("index") == step: + operator_name = op_val.get("name") + operator_status = op_val.get("status") + break + return { "task_id": task_id, "pipeline_id": execution_data.get("pipeline_id"), @@ -386,8 +410,10 @@ def get_execution_result( "file_exists": file_exists, "cache_file": cache_file, "logs": logs, + "operator_logs": operator_logs, "started_at": execution_data.get("started_at"), - "completed_at": execution_data.get("completed_at") + "completed_at": execution_data.get("completed_at"), + "operators_detail": operators_detail } async def start_execution_async(