Skip to content

Commit

Permalink
Pass stream workunit context to all callbacks calls instead of just p…
Browse files Browse the repository at this point in the history
…assing it in the last call. (pantsbuild#10340)

### Problem

After pantsbuild#10034, the context kwarg is only passed to the callback in the last call (where finished=True)

### Solution

Pass the context param on every call to the callback

### Result

Handlers/callbacks don't have to wait for the last call in order to call the context APIs in order to dump digests (for example)
  • Loading branch information
asherf authored and Eric-Arellano committed Jul 15, 2020
1 parent 76ebbc7 commit 9660b8f
Showing 1 changed file with 10 additions and 2 deletions.
12 changes: 10 additions & 2 deletions src/python/pants/reporting/streaming_workunit_handler.py
Expand Up @@ -45,13 +45,18 @@ def __init__(
self.report_interval = report_interval_seconds
self.callbacks = callbacks
self._thread_runner: Optional[_InnerHandler] = None
self._context = StreamingWorkunitContext(_scheduler=self.scheduler)
# TODO(10092) The max verbosity should be a per-client setting, rather than a global setting.
self.max_workunit_verbosity = max_workunit_verbosity

def start(self) -> None:
if self.callbacks:
self._thread_runner = _InnerHandler(
self.scheduler, self.callbacks, self.report_interval, self.max_workunit_verbosity
scheduler=self.scheduler,
context=self._context,
callbacks=self.callbacks,
report_interval=self.report_interval,
max_workunit_verbosity=self.max_workunit_verbosity,
)
self._thread_runner.start()

Expand All @@ -67,7 +72,7 @@ def end(self) -> None:
workunits=workunits["completed"],
started_workunits=workunits["started"],
finished=True,
context=StreamingWorkunitContext(_scheduler=self.scheduler),
context=self._context,
)

@contextmanager
Expand All @@ -86,12 +91,14 @@ class _InnerHandler(threading.Thread):
def __init__(
self,
scheduler: Any,
context: StreamingWorkunitContext,
callbacks: Iterable[Callable],
report_interval: float,
max_workunit_verbosity: LogLevel,
):
super().__init__(daemon=True)
self.scheduler = scheduler
self._context = context
self.stop_request = threading.Event()
self.report_interval = report_interval
self.callbacks = callbacks
Expand All @@ -105,6 +112,7 @@ def run(self):
workunits=workunits["completed"],
started_workunits=workunits["started"],
finished=False,
context=self._context,
)
self.stop_request.wait(timeout=self.report_interval)

Expand Down

0 comments on commit 9660b8f

Please sign in to comment.