forked from simplistix/testfixtures
/
popen.py
282 lines (235 loc) · 9.68 KB
/
popen.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
import pipes
from functools import wraps, partial
from io import TextIOWrapper
from itertools import chain
from subprocess import STDOUT, PIPE
from tempfile import TemporaryFile
from testfixtures.compat import basestring, PY3, zip_longest, reduce, PY2
from testfixtures.utils import extend_docstring
from .mock import Mock, call
def shell_join(command):
if not isinstance(command, basestring):
command = " ".join(pipes.quote(part) for part in command)
return command
class PopenBehaviour(object):
"""
An object representing the behaviour of a :class:`MockPopen` when
simulating a particular command.
"""
def __init__(self, stdout=b'', stderr=b'', returncode=0, pid=1234,
poll_count=3):
self.stdout = stdout
self.stderr = stderr
self.returncode = returncode
self.pid = pid
self.poll_count = poll_count
def record(func):
@wraps(func)
def recorder(self, *args, **kw):
self._record((func.__name__,), *args, **kw)
return func(self, *args, **kw)
return recorder
class MockPopenInstance(object):
"""
A mock process as returned by :class:`MockPopen`.
"""
#: A :class:`~unittest.mock.Mock` representing the pipe into this process.
#: This is only set if ``stdin=PIPE`` is passed the constructor.
#: The mock records writes and closes in :attr:`MockPopen.all_calls`.
stdin = None
#: A file representing standard output from this process.
stdout = None
#: A file representing error output from this process.
stderr = None
def __init__(self, mock_class, root_call,
args, bufsize=0, executable=None,
stdin=None, stdout=None, stderr=None,
preexec_fn=None, close_fds=False, shell=False, cwd=None,
env=None, universal_newlines=False,
startupinfo=None, creationflags=0, restore_signals=True,
start_new_session=False, pass_fds=(),
encoding=None, errors=None, text=None):
self.mock = Mock()
self.class_instance_mock = mock_class.mock.Popen_instance
#: A :func:`unittest.mock.call` representing the call made to instantiate
#: this mock process.
self.root_call = root_call
#: The calls made on this mock process, represented using
#: :func:`~unittest.mock.call` instances.
self.calls = []
self.all_calls = mock_class.all_calls
cmd = shell_join(args)
behaviour = mock_class.commands.get(cmd, mock_class.default_behaviour)
if behaviour is None:
raise KeyError('Nothing specified for command %r' % cmd)
if callable(behaviour):
behaviour = behaviour(command=cmd, stdin=stdin)
self.behaviour = behaviour
stdout_value = behaviour.stdout
stderr_value = behaviour.stderr
if stderr == STDOUT:
line_iterator = chain.from_iterable(zip_longest(
stdout_value.splitlines(True),
stderr_value.splitlines(True)
))
stdout_value = b''.join(l for l in line_iterator if l)
stderr_value = None
self.poll_count = behaviour.poll_count
for name, option, mock_value in (
('stdout', stdout, stdout_value),
('stderr', stderr, stderr_value)
):
value = None
if option is PIPE:
value = TemporaryFile()
value.write(mock_value)
value.flush()
value.seek(0)
if PY3 and (universal_newlines or text or encoding):
value = TextIOWrapper(value, encoding=encoding, errors=errors)
setattr(self, name, value)
if stdin == PIPE:
self.stdin = Mock()
for method in 'write', 'close':
record_writes = partial(self._record, ('stdin', method))
getattr(self.stdin, method).side_effect = record_writes
self.pid = behaviour.pid
#: The return code of this mock process.
self.returncode = None
if PY3:
self.args = args
def _record(self, names, *args, **kw):
for mock in self.class_instance_mock, self.mock:
reduce(getattr, names, mock)(*args, **kw)
for base_call, store in (
(call, self.calls),
(self.root_call, self.all_calls)
):
store.append(reduce(getattr, names, base_call)(*args, **kw))
if PY3:
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.wait()
for stream in self.stdout, self.stderr:
if stream:
stream.close()
@record
def wait(self, timeout=None):
"Simulate calls to :meth:`subprocess.Popen.wait`"
self.returncode = self.behaviour.returncode
return self.returncode
@record
def communicate(self, input=None, timeout=None):
"Simulate calls to :meth:`subprocess.Popen.communicate`"
self.returncode = self.behaviour.returncode
return (self.stdout and self.stdout.read(),
self.stderr and self.stderr.read())
else:
@record
def wait(self): # pragma: no cover
"Simulate calls to :meth:`subprocess.Popen.wait`"
self.returncode = self.behaviour.returncode
return self.returncode
@record
def communicate(self, input=None):
"Simulate calls to :meth:`subprocess.Popen.communicate`"
self.returncode = self.behaviour.returncode
return (self.stdout and self.stdout.read(),
self.stderr and self.stderr.read())
@record
def poll(self):
"Simulate calls to :meth:`subprocess.Popen.poll`"
while self.poll_count and self.returncode is None:
self.poll_count -= 1
return None
# This call to wait() is NOT how poll() behaves in reality.
# poll() NEVER sets the returncode.
# The returncode is *only* ever set by process completion.
# The following is an artifact of the fixture's implementation.
self.returncode = self.behaviour.returncode
return self.returncode
@record
def send_signal(self, signal):
"Simulate calls to :meth:`subprocess.Popen.send_signal`"
pass
@record
def terminate(self):
"Simulate calls to :meth:`subprocess.Popen.terminate`"
pass
@record
def kill(self):
"Simulate calls to :meth:`subprocess.Popen.kill`"
pass
class MockPopen(object):
"""
A specialised mock for testing use of :class:`subprocess.Popen`.
An instance of this class can be used in place of the
:class:`subprocess.Popen` and is often inserted where it's needed using
:func:`unittest.mock.patch` or a :class:`~testfixtures.Replacer`.
"""
default_behaviour = None
def __init__(self):
self.commands = {}
self.mock = Mock()
#: All calls made using this mock and the objects it returns, represented using
#: :func:`~unittest.mock.call` instances.
self.all_calls = []
def _resolve_behaviour(self, stdout, stderr, returncode,
pid, poll_count, behaviour):
if behaviour is None:
return PopenBehaviour(
stdout, stderr, returncode, pid, poll_count
)
else:
return behaviour
def set_command(self, command, stdout=b'', stderr=b'', returncode=0,
pid=1234, poll_count=3, behaviour=None):
"""
Set the behaviour of this mock when it is used to simulate the
specified command.
:param command: A string representing the command to be simulated.
"""
self.commands[shell_join(command)] = self._resolve_behaviour(
stdout, stderr, returncode, pid, poll_count, behaviour
)
def set_default(self, stdout=b'', stderr=b'', returncode=0,
pid=1234, poll_count=3, behaviour=None):
"""
Set the behaviour of this mock when it is used to simulate commands
that have no explicit behavior specified using
:meth:`~MockPopen.set_command` or :meth:`~MockPopen.set_callable`.
"""
self.default_behaviour = self._resolve_behaviour(
stdout, stderr, returncode, pid, poll_count, behaviour
)
def __call__(self, *args, **kw):
self.mock.Popen(*args, **kw)
root_call = call.Popen(*args, **kw)
self.all_calls.append(root_call)
return MockPopenInstance(self, root_call, *args, **kw)
set_command_params = """
:param stdout:
A string representing the simulated content written by the process
to the stdout pipe.
:param stderr:
A string representing the simulated content written by the process
to the stderr pipe.
:param returncode:
An integer representing the return code of the simulated process.
:param pid:
An integer representing the process identifier of the simulated
process. This is useful if you have code the prints out the pids
of running processes.
:param poll_count:
Specifies the number of times :meth:`MockPopen.poll` can be
called before :attr:`MockPopen.returncode` is set and returned
by :meth:`MockPopen.poll`.
If supplied, ``behaviour`` must be either a :class:`PopenBehaviour`
instance or a callable that takes the ``command`` string representing
the command to be simulated and the ``stdin`` for that command and
returns a :class:`PopenBehaviour` instance.
"""
# add the param docs, so we only have one copy of them!
extend_docstring(set_command_params,
[MockPopen.set_command, MockPopen.set_default])