ExecutorX is an advanced executor library for Python that extends concurrent.futures.Executor with new capabilities, including task progress tracking, submission throttling, modular lifecycle hooks, and improved support for worker initialization. Whether you're running CPU-bound tasks in processes, IO-bound tasks in threads, or debugging in the main thread, ExecutorX makes parallelism more flexible and powerful.
- ProcessPoolExecutor and ThreadPoolExecutor:
- Advanced executors with support for addons, task tracking, and flexible worker initialization.
- ImmediateExecutor:
- Executes tasks in the main thread synchronously for debugging and testing.
- Progress Tracking: Monitor task completion and pending tasks in real time.
- Submission Throttling: Limit the number of concurrent tasks or submission rate to prevent overloading workers.
- Worker Initialization: Define custom initialization logic for workers (e.g., resource setup).
- Worker Identification: Assign and track unique worker IDs.
- Lifecycle Addons: Extend executor behavior with modular addons for logging, monitoring, or custom task logic.
- Support for
spawnContext:- Ensures compatibility with platforms like Windows, which lack support for the
forkmultiprocessing context.
- Ensures compatibility with platforms like Windows, which lack support for the
- Graceful Shutdown:
- Ensures all tasks complete before the executor shuts down, even when using custom addons.
pip install ExecutorXfrom executorx.addons.progress import ProgressAddon
from executorx.futures.executors import ProcessPoolExecutor
def my_function(x):
return x * x
# Initialize a process pool with progress tracking
executor = ProcessPoolExecutor(max_workers=4, addons=[ProgressAddon])
# Submit tasks
futures = [executor.submit(my_function, i) for i in range(10)]
# Collect results
for future in futures:
print(future.result())
executor.shutdown()The ImmediateExecutor runs all tasks in the main thread synchronously. This is useful for debugging or environments where parallelism isn't required.
from executorx.futures.executors import ImmediateExecutor
def debug_task(x):
print(f"Processing {x} in the main thread")
return x * x
executor = ImmediateExecutor()
# Submit tasks
futures = [executor.submit(debug_task, i) for i in range(5)]
# Collect results
for future in futures:
print(future.result())
executor.shutdown()Control the number of concurrent or submitted tasks using the ThrottleAddon.
from executorx.addons.throttle import ThrottleAddon
from executorx.futures.executors import ThreadPoolExecutor
executor = ThreadPoolExecutor(
max_workers=4, addons=[ThrottleAddon(max_concurrent_tasks=2)]
)
# Submit tasks (at most 2 will run concurrently)
futures = [executor.submit(print, f"Task {i}") for i in range(10)]
executor.shutdown()Set up custom initialization logic for workers, such as loading shared resources.
from executorx.futures.executors import ProcessPoolExecutor
def initialize_worker():
print("Initializing worker...")
executor = ProcessPoolExecutor(max_workers=4, initializer=initialize_worker)
executor.submit(print, "Task executed with worker initialization")
executor.shutdown()Addons provide modular hooks into the executor lifecycle, allowing custom logic for task submission, initialization, and shutdown.
from executorx.foundation.addon import PoolExecutorAddon
class MyCustomAddon(PoolExecutorAddon):
def before_submit(self):
print("Preparing to submit task")
def after_submit(self, future):
print("Task submitted")
# Use the addon
from executorx.futures.executors import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=4, addons=[MyCustomAddon])
executor.submit(print, "Hello from ExecutorX!")
executor.shutdown()While setting max_workers=0 in a process or thread pool allows single-threaded execution, ImmediateExecutor provides distinct advantages:
-
Code Clarity:
- It explicitly communicates the intent to disable parallelism while keeping the API consistent with other executors.
-
Debugging Support:
- Addons, worker initialization, and task tracking remain functional, making it easy to debug parallelized code in a single-threaded environment.
-
Feature Parity:
- ImmediateExecutor retains all features of other executors (e.g., addons, lifecycle management), ensuring that testing environments mirror production.
Contributions are welcome! To get started:
- Fork the repository.
- Create a feature branch (
git checkout -b feature-name). - Commit your changes (
git commit -am 'Add feature-name'). - Push to the branch (
git push origin feature-name). - Create a pull request.
ExecutorX is licensed under the GNU Lesser General Public License (LGPL) v2.1. See the LICENSE file for details.
- Picklable Executors: Allow executors to be serialized and passed across processes.