-
Notifications
You must be signed in to change notification settings - Fork 240
/
subprocess.py
171 lines (141 loc) · 5.56 KB
/
subprocess.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# curio/subprocess.py
#
# A curio-compatible standin for the subprocess module. Provides
# asynchronous compatible versions of Popen(), check_output(),
# and run() functions.
__all__ = ['run', 'Popen', 'CompletedProcess', 'CalledProcessError',
'SubprocessError', 'check_output', 'PIPE', 'STDOUT', 'DEVNULL']
# -- Standard Library
import subprocess
import os
import sys
from subprocess import (
CompletedProcess,
SubprocessError,
CalledProcessError,
PIPE,
STDOUT,
DEVNULL,
)
# -- Curio
from .task import spawn
from .time import sleep
from .errors import CancelledError
from .io import FileStream
from . import thread
from .workers import run_in_thread
if sys.platform.startswith('win'):
from .file import AsyncFile as FileStream
class Popen(object):
'''
Curio wrapper around the Popen class from the subprocess module. All of the
methods from subprocess.Popen should be available, but the associated file
objects for stdin, stdout, stderr have been replaced by async versions.
Certain blocking operations (e.g., wait() and communicate()) have been
replaced by async compatible implementations. Explicit timeouts
are not available. Use the timeout_after() function for timeouts.
'''
def __init__(self, args, **kwargs):
if 'universal_newlines' in kwargs:
raise RuntimeError('universal_newlines argument not supported')
# If stdin has been given and it's set to a curio FileStream object,
# then we need to flip it to blocking.
if 'stdin' in kwargs:
stdin = kwargs['stdin']
if isinstance(stdin, FileStream):
# At hell's heart I stab thy coroutine attempting to read from a stream
# that's been used as a pipe input to a subprocess. Must set back to
# blocking or all hell breaks loose in the child.
if hasattr(os, 'set_blocking'):
os.set_blocking(stdin.fileno(), True)
self._popen = subprocess.Popen(args, **kwargs)
if self._popen.stdin:
self.stdin = FileStream(self._popen.stdin)
if self._popen.stdout:
self.stdout = FileStream(self._popen.stdout)
if self._popen.stderr:
self.stderr = FileStream(self._popen.stderr)
def __getattr__(self, name):
return getattr(self._popen, name)
async def wait(self):
retcode = self._popen.poll()
if retcode is None:
retcode = await run_in_thread(self._popen.wait)
return retcode
async def communicate(self, input=b''):
'''
Communicates with a subprocess. input argument gives data to
feed to the subprocess stdin stream. Returns a tuple (stdout, stderr)
corresponding to the process output. If cancelled, the resulting
cancellation exception has stdout_completed and stderr_completed
attributes attached containing the bytes read so far.
'''
stdout_task = await spawn(self.stdout.readall) if self.stdout else None
stderr_task = await spawn(self.stderr.readall) if self.stderr else None
try:
if input:
await self.stdin.write(input)
await self.stdin.close()
stdout = await stdout_task.join() if stdout_task else b''
stderr = await stderr_task.join() if stderr_task else b''
return (stdout, stderr)
except CancelledError as err:
if stdout_task:
await stdout_task.cancel()
err.stdout = stdout_task.exception.bytes_read
else:
err.stdout = b''
if stderr_task:
await stderr_task.cancel()
err.stderr = stderr_task.exception.bytes_read
else:
err.stderr = b''
raise
async def __aenter__(self):
return self
async def __aexit__(self, *args):
if self.stdout:
await self.stdout.close()
if self.stderr:
await self.stderr.close()
if self.stdin:
await self.stdin.close()
# Wait for the process to terminate
await self.wait()
def __enter__(self):
return thread.AWAIT(self.__aenter__())
def __exit__(self, *args):
return thread.AWAIT(self.__aexit__(*args))
async def run(args, *, stdin=None, input=None, stdout=None, stderr=None, shell=False, check=False):
'''
Curio-compatible version of subprocess.run()
'''
if input:
stdin = subprocess.PIPE
else:
stdin = None
async with Popen(args, stdin=stdin, stdout=stdout, stderr=stderr, shell=shell) as process:
try:
stdout, stderr = await process.communicate(input)
except CancelledError as err:
process.kill()
stdout, stderr = await process.communicate()
# Append the remaining stdout, stderr to the exception
err.stdout += stdout
err.stderr += stderr
raise err
except:
process.kill()
raise
retcode = process.poll()
if check and retcode:
raise CalledProcessError(retcode, process.args,
output=stdout, stderr=stderr)
return CompletedProcess(process.args, retcode, stdout, stderr)
async def check_output(args, *, stdin=None, stderr=None, shell=False, input=None):
'''
Curio compatible version of subprocess.check_output()
'''
out = await run(args, stdout=PIPE, stdin=stdin, stderr=stderr, shell=shell,
check=True, input=input)
return out.stdout