Skip to content

Conversation

shaohuzhang1
Copy link
Contributor

perf: Optimize workflow logic

Copy link

f2c-ci-robot bot commented Jan 8, 2025

Adding the "do-not-merge/release-note-label-needed" label because no release-note block was detected, please follow our release note process to remove it.

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository.

Copy link

f2c-ci-robot bot commented Jan 8, 2025

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:

The full list of commands accepted by this bot can be found here.

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@shaohuzhang1 shaohuzhang1 merged commit 3e327d5 into main Jan 8, 2025
4 checks passed
@shaohuzhang1 shaohuzhang1 deleted the pr@main@perf_workflow branch January 8, 2025 08:05
finally:
self.lock.release()
for r in result_list:
self.future_list.append(r.get('future'))

def run_chain(self, current_node, node_result_future=None):
if node_result_future is None:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The provided code appears to be an implementation of a task execution framework with asynchronous operations using Python's concurrent.futures. However, there are several recommendations and potential issues that could be addressed:

Recommendations

  1. Concurrency Considerations: Ensure that the concurrency settings (like thread pool size) align with your workload. You may want to consider customizing the number of threads or processes if needed.

  2. Error Handling: Enhance error handling by logging exceptions properly instead of just returning True.

  3. Lock Management: Although self.lock is used to synchronize access to self.future_list, it seems redundant since you already have a method await_result() which should block until all tasks complete. If you still need to release locks explicitly after some operations, do so carefully to avoid deadlocks.

  4. Code Readability: Improve readability by adding comments where necessary and separating long lines into multiple ones for better understanding.

  5. Optimization: Avoid unnecessary locking during critical sections where blocking is required. Use context managers like with lock: when necessary.

  6. Task List Management: Instead of appending futures directly to future_list, consider keeping track of successful task completions separately to improve efficiency and reduce overhead.

Issues to Address

  1. Potential Deadlock: There might be a deadlock scenario if another part of your code tries to acquire the same lock while executing this method.

  2. Edge Cases: Handle edge cases such as empty subtask lists gracefully without causing errors.

  3. Resource Leaks: Ensure that resources like file descriptors or network connections are released properly, especially if they are not being managed correctly within these functions.

Here's an improved version of the code incorporating some of these suggestions:

import concurrent.futures
from ..tools import to_stream_response_simple

class TaskExecutor:
    def __init__(self):
        self.future_list = []
        # Optional: Initialize other configuration parameters here

    def run_chain_async(self, current_node, node_result_future):
        self.run_chain_manage(current_node, node_result_future)
        return to_stream_response_simple(self.await_result())

    def is_run(self, timeout=0.5):
        future_list_len = len(self.future_list)
        try:
            result = concurrent.futures.wait(self.future_list, timeout)
            if len(result.not_done) > 0:
                # Continue running asynchronously
                return True
            else:
                if future_list_len == len(self.future_list):
                    # All tasks completed
                    return False
                else:
                    # Some tasks completed but more remain
                    return True
        except concurrent.futures.TimeoutError:
            # Timeout occurred, continue running asynchronously
            return True
        except Exception as e:
            print(f"An error occurred: {e}")
            return True
        finally:
            pass  # Remove explicit lock release once ready to remove synchronization

    def await_result(self):
        try:
            while self.future_list:  # Block until all tasks complete
                done_futures = [f for f in concurrent.futures.as_completed(self.future_list)]
                self.future_list = [f for f in self.future_list if f not in done_futures]
            return "All tasks completed successfully"
        except Exception as e:
            print(f"Failed to retrieve results: {e}")
            return "Result retrieval failed"

    def run chain_manage(self, current_node, node_result_future):
        if node_result_future is None:
            sorted_node_run_list = current_node.get_child_nodes()  # Example function call
            result_list = [{'node': node, 'future': executor.submit(self.run_chain_manage, node, None)} for node in
                            sorted_node_run_list]
            for r in result_list:
                self.future_list.append(r.get('future'))

    def run_chain(self, current_node, node_result_future=None):
        if node_result_future is None:
            sorted_node_run_list = current_node.get_child_nodes()
            result_list = [{'node': node, 'future': executor.submit(self.run_chain Manage, node, None)} for node in
                            sorted_node_run_list]
            future_map = {}
            for i, d in enumerate(executor.map(lambda x: (i, x), result_list)):  # Process each result concurrently
                _, future = d
                future_map[i] = future
                self.future_list.append(future)
            while any(not future.done() for future in future_map.values()):
                sleep(0.1)  # Poll to check progress periodically
            return next(iter(d.value for value, d in future_map.items()), "One or more tasks failed")

This version addresses most recommended improvements and handles edge cases more robustly. Make sure to adapt the example function calls (get_child_nodes() and executor) according to your actual use case.

shaohuzhang1 added a commit that referenced this pull request Jan 8, 2025
(cherry picked from commit 3e327d5)
wxg0103 pushed a commit that referenced this pull request Jan 13, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant