Skip to content

feat: implement real-time log streaming for intermediate progress capture#52

Merged
MOLYHECI merged 1 commit intoOpenDCAI:devfrom
duanchy3:dev
Jan 26, 2026
Merged

feat: implement real-time log streaming for intermediate progress capture#52
MOLYHECI merged 1 commit intoOpenDCAI:devfrom
duanchy3:dev

Conversation

@duanchy3
Copy link
Copy Markdown
Contributor

No description provided.

Copilot AI review requested due to automatic review settings January 26, 2026 14:44
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR implements real-time log streaming capability to capture and parse intermediate progress from operator executions. The changes enable live progress tracking by introducing a custom LogStream class that extends io.StringIO to intercept and parse stdout/stderr in real-time, extracting progress bar information (particularly from tqdm-style output) and updating execution status with throttling to avoid excessive disk I/O.

Changes:

  • Added LogStream class that captures stdout/stderr in real-time and parses progress bar indicators
  • Replaced standard io.StringIO with LogStream instances for stdout and stderr redirection
  • Implemented progress percentage extraction and throttled status updates (0.5s interval)

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

now = time.time()
if now - self.last_update_time > self.update_interval:
self.update_func("running", {
"operators_detail": self.operators_detail,
Copy link

Copilot AI Jan 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate dictionary key "operators_detail" in the update function call. The second occurrence will overwrite the first one, making the first key assignment redundant. Remove the duplicate key to maintain clean code.

Suggested change
"operators_detail": self.operators_detail,

Copilot uses AI. Check for mistakes.
Comment on lines +84 to +91
# Throttle disk updates
now = time.time()
if now - self.last_update_time > self.update_interval:
self.update_func("running", {
"operators_detail": self.operators_detail,
"operators_detail": self.operators_detail
})
self.last_update_time = now
Copy link

Copilot AI Jan 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The update_func is called frequently with shared mutable state (operators_detail, operator_logs). This could lead to race conditions if the write() method is called from multiple threads concurrently. While Python's GIL provides some protection, file I/O operations in update_execution_status release the GIL, potentially causing inconsistent reads/writes if multiple LogStream instances are active simultaneously. Consider adding synchronization mechanisms like threading.Lock if concurrent execution is possible.

Copilot uses AI. Check for mistakes.
Custom stream to capture stdout/stderr in real-time,
parse progress bars, and update execution status.
"""
def __init__(self, op_key: str, operators_detail: Dict, operator_logs: Dict, update_func: callable, add_log_func: callable):
Copy link

Copilot AI Jan 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type annotation 'callable' should be 'Callable' (capitalized) and imported from the typing module. This is the proper type hint for callable objects in Python. Update the import statement to include 'Callable' and change the type annotations accordingly.

Copilot uses AI. Check for mistakes.
def write(self, s: str):
# Write to internal buffer (standard StringIO behavior)
super().write(s)

Copy link

Copilot AI Jan 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _process_progress method performs regex search operations on every write call that contains '\r' or matches the progress pattern. For high-frequency writes (e.g., verbose logging), this could add noticeable overhead. Consider adding a simple check to skip processing if the text is too short (e.g., less than 3 characters) or doesn't contain digits, which are required for percentage matching anyway.

Suggested change
# Fast-path: avoid regex processing for very short strings or those without digits
if len(s) < 3 or not any(ch.isdigit() for ch in s):
return

Copilot uses AI. Check for mistakes.
Comment on lines +87 to +90
self.update_func("running", {
"operators_detail": self.operators_detail,
"operators_detail": self.operators_detail
})
Copy link

Copilot AI Jan 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If update_func (update_execution_status) raises an exception, it will propagate and potentially interrupt the stream writing. While the original update_execution_status function has try-except for file operations, it only logs errors. If an exception occurs here during stream processing, it could disrupt the operator execution. Consider wrapping the update_func call in a try-except block to ensure that failures in status updates don't break the log streaming functionality.

Suggested change
self.update_func("running", {
"operators_detail": self.operators_detail,
"operators_detail": self.operators_detail
})
try:
self.update_func("running", {
"operators_detail": self.operators_detail,
"operators_detail": self.operators_detail
})
except Exception:
logger.exception(
"Failed to update execution status for operator '%s' during progress update",
self.op_key,
)

Copilot uses AI. Check for mistakes.
Comment on lines +58 to +65
self.progress_pattern = re.compile(r'(\d+%\|)|(it/s)|(s/it)')
self.percentage_pattern = re.compile(r'(\d+(?:\.\d+)?)%')

def write(self, s: str):
# Write to internal buffer (standard StringIO behavior)
super().write(s)

if "\r" in s or self.progress_pattern.search(s):
Copy link

Copilot AI Jan 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The progress_pattern regex is used to decide whether to process progress, but the actual percentage extraction in _process_progress only uses percentage_pattern. This means if a line contains '\r' but no percentage (e.g., just a carriage return for cursor positioning), _process_progress will be called but won't do anything useful. The logic could be more efficient by checking for percentage presence first before calling _process_progress.

Suggested change
self.progress_pattern = re.compile(r'(\d+%\|)|(it/s)|(s/it)')
self.percentage_pattern = re.compile(r'(\d+(?:\.\d+)?)%')
def write(self, s: str):
# Write to internal buffer (standard StringIO behavior)
super().write(s)
if "\r" in s or self.progress_pattern.search(s):
self.percentage_pattern = re.compile(r'(\d+(?:\.\d+)?)%')
def write(self, s: str):
# Write to internal buffer (standard StringIO behavior)
super().write(s)
if self.percentage_pattern.search(s):

Copilot uses AI. Check for mistakes.
Comment on lines +41 to +94
class LogStream(io.StringIO):
"""
Custom stream to capture stdout/stderr in real-time,
parse progress bars, and update execution status.
"""
def __init__(self, op_key: str, operators_detail: Dict, operator_logs: Dict, update_func: callable, add_log_func: callable):
super().__init__()
self.op_key = op_key
self.operators_detail = operators_detail
self.operator_logs = operator_logs
self.update_func = update_func
self.add_log_func = add_log_func

self.last_update_time = 0
self.update_interval = 0.5 # Update at most every 0.5s

# Regex reusing from parse_and_clean_logs, but adaptable for fragments
self.progress_pattern = re.compile(r'(\d+%\|)|(it/s)|(s/it)')
self.percentage_pattern = re.compile(r'(\d+(?:\.\d+)?)%')

def write(self, s: str):
# Write to internal buffer (standard StringIO behavior)
super().write(s)

if "\r" in s or self.progress_pattern.search(s):
self._process_progress(s)

def _process_progress(self, text: str):
match = self.percentage_pattern.search(text)
if match:
try:
pct = float(match.group(1))
# Update in-memory dict
self.operators_detail[self.op_key]["progress_percentage"] = pct

# Also try to capture the full progress line for "progress" field
# If text contains "it/s" or "|", use it as description
if "|" in text:
# Clean up CRs for clean storage
clean_text = text.replace('\r', '').strip()
if clean_text:
self.operators_detail[self.op_key]["progress"] = clean_text[-100:] # Keep last 100 chars to avoid huge strings

# Throttle disk updates
now = time.time()
if now - self.last_update_time > self.update_interval:
self.update_func("running", {
"operators_detail": self.operators_detail,
"operators_detail": self.operators_detail
})
self.last_update_time = now
except ValueError:
pass

Copy link

Copilot AI Jan 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new LogStream class and its integration lack test coverage. Given that the codebase has comprehensive test coverage for other modules (test_pipeline_registry.py, test_task_registry.py, etc.), consider adding tests for the LogStream class to cover: 1) progress parsing with various tqdm-style formats, 2) throttling behavior (updates at most every 0.5s), 3) handling of malformed progress strings, 4) interaction with update_func and add_log_func, and 5) proper StringIO behavior preservation.

Copilot uses AI. Check for mistakes.
"operators_detail": self.operators_detail
})
self.last_update_time = now
except ValueError:
Copy link

Copilot AI Jan 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'except' clause does nothing but pass and there is no explanatory comment.

Suggested change
except ValueError:
except ValueError:
# If the percentage cannot be parsed from the progress text,
# silently ignore this update and leave the previous progress intact.

Copilot uses AI. Check for mistakes.
@MOLYHECI
Copy link
Copy Markdown
Collaborator

LGTM. Thanks!

@MOLYHECI MOLYHECI merged commit bdb6f8e into OpenDCAI:dev Jan 26, 2026
5 of 6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants