-
Notifications
You must be signed in to change notification settings - Fork 20
/
_subprocess_tools.py
147 lines (124 loc) · 4.6 KB
/
_subprocess_tools.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
"""
Utilities for running subprocesses.
"""
import logging
import subprocess
import time
from subprocess import CompletedProcess
from typing import Callable, Dict, List, Optional, Union
import sarge
LOGGER = logging.getLogger(__name__)
def _safe_decode(output_bytes: bytes) -> str:
"""
Decode a bytestring to Unicode with a safe fallback.
"""
try:
return output_bytes.decode(
encoding='utf-8',
errors='strict',
)
except UnicodeDecodeError:
return output_bytes.decode(
encoding='ascii',
errors='backslashreplace',
)
class _LineLogger:
"""
A logger which logs full lines.
"""
def __init__(self, logger: Callable[[str], None]) -> None:
self._buffer = b''
self._logger = logger
def log(self, data: bytes) -> None:
self._buffer += data
lines = self._buffer.split(b'\n')
self._buffer = lines.pop()
for line in lines:
self._logger(_safe_decode(line))
def flush(self) -> None:
if self._buffer:
self._logger(_safe_decode(self._buffer))
self._buffer = b''
def run_subprocess(
args: List[str],
log_output_live: bool,
cwd: Optional[Union[bytes, str]] = None,
env: Optional[Dict[str, str]] = None,
pipe_output: bool = True,
) -> CompletedProcess:
"""
Run a command in a subprocess.
Args:
args: See :py:func:`subprocess.run`.
log_output_live: If `True`, log output live.
cwd: See :py:func:`subprocess.run`.
env: See :py:func:`subprocess.run`.
pipe_output: If ``True``, pipes are opened to stdout and stderr.
This means that the values of stdout and stderr will be in
the returned ``subprocess.CompletedProcess`` and optionally
sent to a logger, given ``log_output_live``.
If ``False``, no output is sent to a logger and the values are
not returned.
Returns:
See :py:func:`subprocess.run`.
Raises:
subprocess.CalledProcessError: See :py:func:`subprocess.run`.
Exception: An exception was raised in getting the output from the call.
"""
stdout_list = [] # type: List[bytes]
stderr_list = [] # type: List[bytes]
stdout_logger = _LineLogger(LOGGER.debug)
stderr_logger = _LineLogger(LOGGER.warning)
def _read_output(process: sarge.Pipeline) -> None:
stdout_line = process.stdout.read(block=False)
stderr_line = process.stderr.read(block=False)
if stdout_line:
stdout_list.append(stdout_line)
if log_output_live:
stdout_logger.log(stdout_line)
if stderr_line:
stderr_list.append(stderr_line)
if log_output_live:
stderr_logger.log(stderr_line)
try:
if pipe_output:
process = sarge.capture_both(args, cwd=cwd, env=env, async_=True)
while all(
command.returncode is None for command in process.commands
):
_read_output(process=process)
process.poll_all()
time.sleep(0.05)
_read_output(process=process)
else:
process = sarge.run(args, cwd=cwd, env=env, async_=True)
stdout_logger.flush()
stderr_logger.flush()
# stderr/stdout are not readable anymore which usually means
# that the child process has exited. However, the child
# process has not been wait()ed for yet, i.e. it has not yet
# been reaped. That is, its exit status is unknown. Read its
# exit status
process.wait()
except Exception: # pragma: no cover pylint: disable=broad-except
for popen_process in process.processes:
# We clean up if there is an error while getting the output.
# This may not happen while running tests so we ignore coverage.
# Attempt to give the subprocess(es) a chance to terminate.
popen_process.terminate()
try:
popen_process.wait(1)
except subprocess.TimeoutExpired:
# If the process cannot terminate cleanly, we just kill it.
popen_process.kill()
raise
stdout = b''.join(stdout_list) if pipe_output else None
stderr = b''.join(stderr_list) if pipe_output else None
if process.returncode != 0:
raise subprocess.CalledProcessError(
returncode=process.returncode,
cmd=args,
output=stdout,
stderr=stderr,
)
return CompletedProcess(args, process.returncode, stdout, stderr)