Skip to content

refactor: improve task execution logging with operator details and stdout capture#36

Merged
MOLYHECI merged 6 commits intoOpenDCAI:backendfrom
duanchy3:backend
Jan 14, 2026
Merged

refactor: improve task execution logging with operator details and stdout capture#36
MOLYHECI merged 6 commits intoOpenDCAI:backendfrom
duanchy3:backend

Conversation

@duanchy3
Copy link
Copy Markdown
Contributor

No description provided.

Copilot AI review requested due to automatic review settings January 12, 2026 06:12
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request refactors task execution logging to improve tracking and observability of pipeline operators. It replaces the deprecated operator_progress structure with a more detailed operators_detail structure, adds stdout/stderr capture for operators, and introduces a new API endpoint to retrieve operator-specific logs.

Changes:

  • Replaced operator_progress with operators_detail to track operator execution state with richer metadata (name, index, status, timestamps, sample_count, etc.)
  • Added stdout/stderr capture using io.StringIO and contextlib redirects during operator execution
  • Introduced get_execution_logs method and /execution/{task_id}/log API endpoint for retrieving global or operator-specific logs

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 18 comments.

File Description
backend/app/services/task_registry.py Updated methods to use operators_detail instead of operator_progress; added new get_execution_logs method for retrieving filtered logs
backend/app/services/dataflow_engine.py Refactored logging structure; added stdout/stderr capture; replaced operator_progress tracking with operators_detail; added sample count tracking
backend/app/api/v1/endpoints/tasks.py Added new /execution/{task_id}/log GET endpoint for retrieving task logs with optional operator filtering
Comments suppressed due to low confidence (2)

backend/app/services/dataflow_engine.py:753

  • The operator_logs field is missing from the return dictionary in the generic exception handler. For consistency with the other return statements (lines 687 and 728), this should include "operator_logs": operator_logs to ensure structured logs are always returned regardless of the error type.
            return {
                "task_id": task_id,
                "status": "failed",
                "output": output,
                "logs": logs,
                "started_at": started_at,
                "completed_at": completed_at
            }

backend/app/services/dataflow_engine.py:366

  • The update_execution_status function has a potential race condition. It reads the entire file, modifies it, and writes it back without any locking mechanism. If multiple processes or threads try to update the execution status simultaneously, updates could be lost. Consider using file locking (e.g., fcntl on Unix or msvcrt on Windows) or a database for concurrent access, or at least add error handling for concurrent modification scenarios.
        def update_execution_status(status: str = None, partial_output: Dict[str, Any] = None):
            """实时更新执行状态到文件"""
            if not execution_path:
                return
            try:
                import json
                with open(execution_path, "r", encoding="utf-8") as f:
                    data = json.load(f)
                if task_id in data.get("tasks", {}):
                    if status:
                        data["tasks"][task_id]["status"] = status
                    if partial_output:
                        if "output" not in data["tasks"][task_id]:
                             data["tasks"][task_id]["output"] = {}
                        # 更新 output 中的内容
                        data["tasks"][task_id]["output"].update(partial_output)
                        
                    with open(execution_path, "w", encoding="utf-8") as f:
                        json.dump(data, f, indent=2)
            except Exception as e:
                logger.error(f"Failed to update execution status: {e}")

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +589 to +607
# ✅ 捕获 stdout/stderr
f_stdout = io.StringIO()
f_stderr = io.StringIO()

try:
with redirect_stdout(f_stdout), redirect_stderr(f_stderr):
operator.run(**run_params)
finally:
stdout_str = f_stdout.getvalue()
stderr_str = f_stderr.getvalue()

if stdout_str:
for line in stdout_str.splitlines():
if line.strip():
add_log("run", f"[STDOUT] {line}", op_key)
if stderr_str:
for line in stderr_str.splitlines():
if line.strip():
add_log("run", f"[STDERR] {line}", op_key)
Copy link

Copilot AI Jan 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Capturing stdout/stderr to StringIO for operators that produce large amounts of output could lead to excessive memory consumption. If an operator prints megabytes or gigabytes of data, this will all be held in memory. Consider:

  1. Adding a size limit on captured output
  2. Writing output directly to a file instead of holding it in memory
  3. Truncating very long output with a warning
  4. Documenting that operators should use logging instead of print statements for production use

Copilot uses AI. Check for mistakes.

# Assuming we only care about completed logs or what's available.
output = execution_data.get("output", {})
operator_logs = output.get("operator_logs", {})
Copy link

Copilot AI Jan 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method is looking for operator_logs inside the output dictionary, but according to the dataflow_engine.py changes (lines 687 and 728), operator_logs is stored at the root level of the execution data, not inside output. This should be:

operator_logs = execution_data.get("operator_logs", {})

instead of:

operator_logs = output.get("operator_logs", {})
Suggested change
operator_logs = output.get("operator_logs", {})
operator_logs = execution_data.get("operator_logs", {})

Copilot uses AI. Check for mistakes.
Comment thread backend/app/services/dataflow_engine.py Outdated
Comment on lines +615 to +622
sample_count = run_params["storage"].get_data_count()
# 或者尝试从 output file 获取 (如果 storage 是 FileStorage)
elif "storage" in run_params and hasattr(run_params["storage"], "file_name"):
try:
with open(run_params["storage"].file_name, 'r') as f:
sample_count = sum(1 for _ in f)
except:
pass
Copy link

Copilot AI Jan 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent indentation: This line uses 5 spaces for indentation instead of 4. It should align with the try statement above.

Suggested change
sample_count = run_params["storage"].get_data_count()
# 或者尝试从 output file 获取 (如果 storage 是 FileStorage)
elif "storage" in run_params and hasattr(run_params["storage"], "file_name"):
try:
with open(run_params["storage"].file_name, 'r') as f:
sample_count = sum(1 for _ in f)
except:
pass
sample_count = run_params["storage"].get_data_count()
# 或者尝试从 output file 获取 (如果 storage 是 FileStorage)
elif "storage" in run_params and hasattr(run_params["storage"], "file_name"):
try:
with open(run_params["storage"].file_name, 'r') as f:
sample_count = sum(1 for _ in f)
except:
pass

Copilot uses AI. Check for mistakes.
Comment thread backend/app/services/dataflow_engine.py Outdated
Comment on lines +615 to +622
sample_count = run_params["storage"].get_data_count()
# 或者尝试从 output file 获取 (如果 storage 是 FileStorage)
elif "storage" in run_params and hasattr(run_params["storage"], "file_name"):
try:
with open(run_params["storage"].file_name, 'r') as f:
sample_count = sum(1 for _ in f)
except:
pass
Copy link

Copilot AI Jan 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent indentation: This line uses 5 spaces for indentation instead of 4. It should align with the except statement above.

Suggested change
sample_count = run_params["storage"].get_data_count()
# 或者尝试从 output file 获取 (如果 storage 是 FileStorage)
elif "storage" in run_params and hasattr(run_params["storage"], "file_name"):
try:
with open(run_params["storage"].file_name, 'r') as f:
sample_count = sum(1 for _ in f)
except:
pass
sample_count = run_params["storage"].get_data_count()
# 或者尝试从 output file 获取 (如果 storage 是 FileStorage)
elif "storage" in run_params and hasattr(run_params["storage"], "file_name"):
try:
with open(run_params["storage"].file_name, 'r') as f:
sample_count = sum(1 for _ in f)
except:
pass

Copilot uses AI. Check for mistakes.
Comment thread backend/app/services/dataflow_engine.py Outdated
try:
with open(run_params["storage"].file_name, 'r') as f:
sample_count = sum(1 for _ in f)
except:
Copy link

Copilot AI Jan 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a bare except clause is a bad practice as it catches all exceptions including system exits and keyboard interrupts. Consider catching specific exceptions like IOError, OSError, or at minimum use 'except Exception:' to avoid catching BaseException subclasses.

Suggested change
except:
except Exception:

Copilot uses AI. Check for mistakes.
Comment thread backend/app/services/dataflow_engine.py Outdated
Comment on lines +615 to +622
sample_count = run_params["storage"].get_data_count()
# 或者尝试从 output file 获取 (如果 storage 是 FileStorage)
elif "storage" in run_params and hasattr(run_params["storage"], "file_name"):
try:
with open(run_params["storage"].file_name, 'r') as f:
sample_count = sum(1 for _ in f)
except:
pass
Copy link

Copilot AI Jan 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent indentation: This line uses 5 spaces for indentation instead of 4. It should align with the with statement above.

Suggested change
sample_count = run_params["storage"].get_data_count()
# 或者尝试从 output file 获取 (如果 storage 是 FileStorage)
elif "storage" in run_params and hasattr(run_params["storage"], "file_name"):
try:
with open(run_params["storage"].file_name, 'r') as f:
sample_count = sum(1 for _ in f)
except:
pass
sample_count = run_params["storage"].get_data_count()
# 或者尝试从 output file 获取 (如果 storage 是 FileStorage)
elif "storage" in run_params and hasattr(run_params["storage"], "file_name"):
try:
with open(run_params["storage"].file_name, 'r') as f:
sample_count = sum(1 for _ in f)
except:
pass

Copilot uses AI. Check for mistakes.
"started_at": execution_data.get("started_at"),
"completed_at": execution_data.get("completed_at")
"completed_at": execution_data.get("completed_at"),
"operators_detail": operators_detail
Copy link

Copilot AI Jan 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trailing whitespace detected at the end of this line. Remove the trailing space for cleaner code.

Copilot uses AI. Check for mistakes.
Comment on lines +300 to +303
# Try finding key ending with name or just name
for k, v in operator_logs.items():
if k == operator_name or k.startswith(f"{operator_name}_"):
return v
Copy link

Copilot AI Jan 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The search logic may not work correctly because operator logs are stored with keys like "op_name_idx" (e.g., "Operator_0") but this check only matches if the key starts with "operator_name_". This means if someone queries with operator_name="Operator", it will match "Operator_0", "Operator_1", etc. However, if the operator has a specific name like "DataFilter", querying with "DataFilter" should match "DataFilter_0", but the startswith check might not work as intended if multiple operators have similar names (e.g., "DataFilter" and "DataFilter_Extended"). Consider using an exact match with the index appended or improving the matching logic.

Suggested change
# Try finding key ending with name or just name
for k, v in operator_logs.items():
if k == operator_name or k.startswith(f"{operator_name}_"):
return v
# Try finding key matching name or name with numeric index suffix (e.g. "op_0")
prefix = f"{operator_name}_"
for k, v in operator_logs.items():
if k == operator_name:
return v
if k.startswith(prefix):
index_part = k[len(prefix):]
if index_part.isdigit():
return v

Copilot uses AI. Check for mistakes.
Comment thread backend/app/services/dataflow_engine.py Outdated
Comment on lines +615 to +622
sample_count = run_params["storage"].get_data_count()
# 或者尝试从 output file 获取 (如果 storage 是 FileStorage)
elif "storage" in run_params and hasattr(run_params["storage"], "file_name"):
try:
with open(run_params["storage"].file_name, 'r') as f:
sample_count = sum(1 for _ in f)
except:
pass
Copy link

Copilot AI Jan 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent indentation: This line uses 5 spaces for indentation instead of 4. It should align with the elif statement above.

Suggested change
sample_count = run_params["storage"].get_data_count()
# 或者尝试从 output file 获取 (如果 storage 是 FileStorage)
elif "storage" in run_params and hasattr(run_params["storage"], "file_name"):
try:
with open(run_params["storage"].file_name, 'r') as f:
sample_count = sum(1 for _ in f)
except:
pass
sample_count = run_params["storage"].get_data_count()
# 或者尝试从 output file 获取 (如果 storage 是 FileStorage)
elif "storage" in run_params and hasattr(run_params["storage"], "file_name"):
try:
with open(run_params["storage"].file_name, 'r') as f:
sample_count = sum(1 for _ in f)
except:
pass

Copilot uses AI. Check for mistakes.
Comment thread backend/app/services/dataflow_engine.py Outdated
Comment on lines +621 to +622
except:
pass
Copy link

Copilot AI Jan 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'except' clause does nothing but pass and there is no explanatory comment.

Suggested change
except:
pass
except (OSError, IOError) as e:
logger.warning(
"Failed to read sample count from file '%s': %s",
getattr(run_params["storage"], "file_name", "<unknown>"),
e,
)

Copilot uses AI. Check for mistakes.
@MOLYHECI
Copy link
Copy Markdown
Collaborator

LGTM. Thank you very much!

@MOLYHECI MOLYHECI merged commit aefb1ac into OpenDCAI:backend Jan 14, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants