Skip to content

Commit

Permalink
Implement non-blocking output streamer
Browse files Browse the repository at this point in the history
- Capture Android emulator output during its startup
  • Loading branch information
rmartin16 committed Mar 8, 2024
1 parent 5616e14 commit 16c45fe
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 58 deletions.
23 changes: 15 additions & 8 deletions src/briefcase/integrations/android_sdk.py
@@ -1,7 +1,9 @@
from __future__ import annotations

import contextlib
import json
import os
import queue
import re
import shlex
import shutil
Expand Down Expand Up @@ -1309,6 +1311,12 @@ def start_emulator(
start_new_session=True,
)

emulator_streamer = self.tools.subprocess.stream_output_non_blocking(
label="Android emulator",
popen_process=emulator_popen,
capture_output=True,
)

# wrap AVD name in quotes since '@' is a special char in PowerShell
emulator_command = " ".join(
f'"{arg}"' if arg.startswith("@") else arg
Expand Down Expand Up @@ -1394,14 +1402,11 @@ def start_emulator(
"Emulator output log for startup failure",
prefix=self.name,
)
try:
# if the emulator exited, this should return its output immediately
self.tools.logger.info(emulator_popen.communicate(timeout=1)[0])
except subprocess.TimeoutExpired:
self.tools.logger.info(
"Briefcase failed to retrieve emulator output "
"(this is expected if the emulator is running)"
)
with contextlib.suppress(queue.Empty):
output = []
while not emulator_streamer.output_queue.empty():
output.append(emulator_streamer.output_queue.get_nowait())
self.tools.logger.info("".join(output))

# Provide troubleshooting steps if user gives up on the emulator starting
if isinstance(e, KeyboardInterrupt):
Expand All @@ -1418,6 +1423,8 @@ def start_emulator(
self.tools.logger.info(general_error_msg)

raise
finally:
emulator_streamer.stop_flag.set()

# Return the device ID and full name.
return device, full_name
Expand Down
161 changes: 111 additions & 50 deletions src/briefcase/integrations/subprocess.py
Expand Up @@ -4,6 +4,7 @@
import json
import operator
import os
import queue
import shlex
import subprocess
import sys
Expand Down Expand Up @@ -145,6 +146,95 @@ def inner(sub: Subprocess, args: SubprocessArgsT, *wrapped_args, **wrapped_kwarg
return inner


class PopenOutputStreamer(threading.Thread):
def __init__(
self,
label: str,
popen_process: subprocess.Popen,
logger: Log,
filter_func: Callable[[str], Iterator[str]] | None,
capture_output: bool = False,
):
"""Thread for streaming stdout for a Popen process.
:param label: Descriptive name for process
:param popen_process: Popen process to stream stdout
:param logger: logger for printing
:param filter_func: a callable that will be invoked on every line of output
that is streamed; see ``stream_output`` for details.
:param capture_output: Retain process output in ``output_queue`` via a
``queue.SimpleQueue`` instead of printing to console.
"""
super().__init__(name=f"{label} output streamer", daemon=True)

self.popen_process = popen_process
self.logger = logger
self.filter_func = filter_func
self.capture_output = capture_output

self.output_queue = queue.Queue(maxsize=1_000_000)
self.stop_flag = threading.Event()

def run(self):
"""Stream output for a Popen process."""
try:
while not self.stop_flag.is_set():
output_line = self.readline()

if output_line == "":
self.stop_flag.set()

else:
filtered_output, stop_streaming = self.filter(output_line)

for filtered_line in filtered_output:
if self.capture_output:
self.output_queue.put_nowait(filtered_line)
else:
self.logger.info(filtered_line)

if stop_streaming:
self.stop_flag.set()
except Exception as e:
self.logger.error(f"Error while streaming output: {type(e).__name__}: {e}")
self.logger.capture_stacktrace("Output thread")

def readline(self) -> str:
"""Read a line of output from the process while blocking.
Reading stdout always returns at least a newline unless the process is exiting
or already exited; in that case, an empty string, i.e. "", is returned.
"""
try:
return ensure_str(self.popen_process.stdout.readline())
except ValueError as e:
# Catch ValueError if stdout is unexpectedly closed; this can
# happen, for instance, if the user starts spamming CTRL+C.
if "I/O operation on closed file" in str(e):
self.logger.warning(
"WARNING: stdout was unexpectedly closed while streaming output"
)
return ""
else:
raise

def filter(self, output_line: str) -> tuple[list[str], bool]:
"""Run filter func over output from process."""
filtered_output = []
stop_streaming = False

if self.filter_func is not None:
try:
for filtered_line in self.filter_func(output_line.strip("\n")):
filtered_output.append(filtered_line)
except StopStreaming:
stop_streaming = True
else:
filtered_output.append(output_line)

return filtered_output, stop_streaming


class NativeAppContext(Tool):
"""A wrapper around subprocess for use as an app-bound tool."""

Expand Down Expand Up @@ -625,11 +715,11 @@ def stream_output(
that should be displayed to the user. Can raise StopStreaming to terminate
the output stream.
"""
output_streamer = threading.Thread(
name=f"{label} output streamer",
target=self._stream_output_thread,
args=(popen_process, filter_func),
daemon=True,
output_streamer = PopenOutputStreamer(
label=label,
popen_process=popen_process,
logger=self.tools.logger,
filter_func=filter_func,
)
try:
output_streamer.start()
Expand All @@ -653,52 +743,23 @@ def stream_output(
"Log stream hasn't terminated; log output may be corrupted."
)

def _stream_output_thread(
def stream_output_non_blocking(
self,
popen_process: subprocess.Popen,
filter_func: Callable[[str], Iterator[str]],
):
"""Stream output for a Popen process in a Thread.
:param popen_process: popen process to stream stdout
:param filter_func: a callable that will be invoked on every line
of output that is streamed; see ``stream_output`` for details.
"""
try:
while True:
try:
output_line = ensure_str(popen_process.stdout.readline())
except ValueError as e:
# Catch ValueError if stdout is unexpectedly closed; this can
# happen, for instance, if the user starts spamming CTRL+C.
if "I/O operation on closed file" in str(e):
self.tools.logger.warning(
"WARNING: stdout was unexpectedly closed while streaming output"
)
return
else:
raise

# readline should always return at least a newline (ie \n) UNLESS
# the underlying process is exiting/gone; then "" is returned.
if output_line:
if filter_func is not None:
try:
for filtered_output in filter_func(
output_line.rstrip("\n")
):
self.tools.logger.info(filtered_output)
except StopStreaming:
return
else:
self.tools.logger.info(output_line)
else:
return
except Exception as e:
self.tools.logger.error(
f"Error while streaming output: {e.__class__.__name__}: {e}"
)
self.tools.logger.capture_stacktrace("Output thread")
label: str,
popen_process: Popen,
capture_output: bool = False,
filter_func: Callable[[str], Iterator[str]] | None = None,
) -> PopenOutputStreamer:
"""Stream the output of a Popen process without blocking."""
output_streamer = PopenOutputStreamer(
label=label,
popen_process=popen_process,
logger=self.tools.logger,
filter_func=filter_func,
capture_output=capture_output,
)
output_streamer.start()
return output_streamer

def cleanup(self, label: str, popen_process: subprocess.Popen):
"""Clean up after a Popen process, gracefully terminating if possible; forcibly
Expand Down

0 comments on commit 16c45fe

Please sign in to comment.