Skip to content

Commit

Permalink
feat(cove_runner): Make the progress bar smoother (#38)
Browse files Browse the repository at this point in the history
Use the ["Submit and use as completed"][doc] pattern as described by Jason
Brownlee in "ThreadPoolExecutor in Python: The Complete Guide".

[doc]: https://superfastpython.com/threadpoolexecutor-in-python/#Submit_and_Use_as_Completed

Fixes #37
  • Loading branch information
iainelder committed Apr 26, 2022
1 parent 5ad6936 commit 0337849
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 12 deletions.
28 changes: 21 additions & 7 deletions botocove/cove_runner.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import logging
from concurrent import futures
from typing import Any, Callable
from concurrent.futures import Future, ThreadPoolExecutor, as_completed
from typing import Any, Callable, Iterable, List

from tqdm import tqdm

from botocove.cove_host_account import CoveHostAccount
from botocove.cove_session import CoveSession
from botocove.cove_types import CoveFunctionOutput, CoveResults, CoveSessionInformation
from botocove.cove_types import CoveFunctionOutput, CoveSessionInformation

logger = logging.getLogger(__name__)

Expand All @@ -33,16 +33,23 @@ def __init__(
self.thread_workers = thread_workers

def run_cove_function(self) -> CoveFunctionOutput:
# Run decorated func with all valid sessions
with futures.ThreadPoolExecutor(max_workers=self.thread_workers) as executor:
completed: CoveResults = list(

# The "Submit and Use as Completed" pattern as described in
# "ThreadPoolExecutor in Python: The Complete Guide".
# https://superfastpython.com/threadpoolexecutor-in-python/#Submit_and_Use_as_Completed
with ThreadPoolExecutor(max_workers=self.thread_workers) as executor:
futures: List["Future[CoveSessionInformation]"] = [
executor.submit(self.cove_thread, s) for s in self.sessions
]
completed: List[CoveSessionInformation] = list(
tqdm(
executor.map(self.cove_thread, self.sessions),
_iterate_results_in_order_of_completion(futures),
total=len(self.sessions),
desc="Executing function",
colour="#ff69b4", # hotpink
)
)

successful_results = [
result for result in completed if not result["ExceptionDetails"]
]
Expand Down Expand Up @@ -78,3 +85,10 @@ def cove_thread(
raise
else:
return cove_session.format_cove_error(e)


def _iterate_results_in_order_of_completion(
jobs: List["Future[CoveSessionInformation]"],
) -> Iterable[CoveSessionInformation]:
for f in as_completed(jobs):
yield f.result()
7 changes: 2 additions & 5 deletions botocove/cove_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,9 @@ class CoveSessionInformation(TypedDict):
Region: Optional[str]


CoveResults = List[CoveSessionInformation]


class CoveFunctionOutput(TypedDict):
Results: CoveResults
Exceptions: CoveResults
Results: List[CoveSessionInformation]
Exceptions: List[CoveSessionInformation]


class CoveOutput(TypedDict):
Expand Down

0 comments on commit 0337849

Please sign in to comment.