/
core.py
268 lines (207 loc) · 7.92 KB
/
core.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
"""This module provides core functions for handling unicode and UNIX quirks
The @interruptable functions retry when system calls are interrupted,
e.g. when python raises an IOError or OSError with errno == EINTR.
"""
from __future__ import division, absolute_import, unicode_literals
import os
import sys
import itertools
import platform
import subprocess
from cola.decorators import interruptable
from cola.compat import ustr
from cola.compat import PY3
# Some files are not in UTF-8; some other aren't in any codification.
# Remember that GIT doesn't care about encodings (saves binary data)
_encoding_tests = [
'utf-8',
'iso-8859-15',
'windows1252',
'ascii',
# <-- add encodings here
]
def decode(enc, encoding=None):
"""decode(encoded_string) returns an unencoded unicode string
"""
if enc is None or type(enc) is ustr:
return enc
if encoding is None:
encoding_tests = _encoding_tests
else:
encoding_tests = itertools.chain([encoding], _encoding_tests)
for encoding in encoding_tests:
try:
return enc.decode(encoding)
except:
pass
# this shouldn't ever happen... FIXME
return ustr(enc)
def encode(string, encoding=None):
"""encode(unencoded_string) returns a string encoded in utf-8
"""
if type(string) is not ustr:
return string
return string.encode(encoding or 'utf-8', 'replace')
def read(filename, size=-1, encoding=None):
"""Read filename and return contents"""
with xopen(filename, 'r') as fh:
return fread(fh, size=size, encoding=encoding)
def write(path, contents, encoding=None):
"""Writes a unicode string to a file"""
with xopen(path, 'wb') as fh:
return fwrite(fh, contents, encoding=encoding)
@interruptable
def fread(fh, size=-1, encoding=None):
"""Read from a filehandle and retry when interrupted"""
return decode(fh.read(size), encoding=encoding)
@interruptable
def fwrite(fh, content, encoding=None):
"""Write to a filehandle and retry when interrupted"""
return fh.write(encode(content, encoding=encoding))
@interruptable
def wait(proc):
"""Wait on a subprocess and retry when interrupted"""
return proc.wait()
@interruptable
def readline(fh, encoding=None):
return decode(fh.readline(), encoding=encoding)
@interruptable
def start_command(cmd, cwd=None, add_env=None,
universal_newlines=False,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
**extra):
"""Start the given command, and return a subprocess object.
This provides a simpler interface to the subprocess module.
"""
env = None
if add_env is not None:
env = os.environ.copy()
env.update(add_env)
if PY3:
# Python3 on windows always goes through list2cmdline() internally inside
# of subprocess.py so we must provide unicode strings here otherwise
# Python3 breaks when bytes are provided.
#
# Additionally, the preferred usage on Python3 is to pass unicode
# strings to subprocess. Python will automatically encode into the
# default encoding (utf-8) when it gets unicode strings.
cmd = [decode(c) for c in cmd]
else:
cmd = [encode(c) for c in cmd]
return subprocess.Popen(cmd, bufsize=1, stdin=stdin, stdout=stdout,
stderr=stderr, cwd=cwd, env=env,
universal_newlines=universal_newlines, **extra)
@interruptable
def communicate(proc):
return proc.communicate()
def run_command(cmd, encoding=None, *args, **kwargs):
"""Run the given command to completion, and return its results.
This provides a simpler interface to the subprocess module.
The results are formatted as a 3-tuple: (exit_code, output, errors)
The other arguments are passed on to start_command().
"""
process = start_command(cmd, *args, **kwargs)
(output, errors) = communicate(process)
output = decode(output, encoding=encoding)
errors = decode(errors, encoding=encoding)
exit_code = process.returncode
return (exit_code, output, errors)
@interruptable
def _fork_posix(args, cwd=None):
"""Launch a process in the background."""
encoded_args = [encode(arg) for arg in args]
return subprocess.Popen(encoded_args, cwd=cwd).pid
def _fork_win32(args, cwd=None):
"""Launch a background process using crazy win32 voodoo."""
# This is probably wrong, but it works. Windows.. wow.
if args[0] == 'git-dag':
# win32 can't exec python scripts
args = [sys.executable] + args
args[0] = _win32_find_exe(args[0])
if PY3:
# see comment in start_command()
argv = [decode(arg) for arg in args]
else:
argv = [encode(arg) for arg in args]
DETACHED_PROCESS = 0x00000008 # Amazing!
return subprocess.Popen(argv, cwd=cwd, creationflags=DETACHED_PROCESS).pid
def _win32_find_exe(exe):
"""Find the actual file for a Windows executable.
This function goes through the same process that the Windows shell uses to
locate an executable, taking into account the PATH and PATHEXT environment
variables. This allows us to avoid passing shell=True to subprocess.Popen.
For reference, see:
http://technet.microsoft.com/en-us/library/cc723564.aspx#XSLTsection127121120120
"""
# try the argument itself
candidates = [exe]
# if argument does not have an extension, also try it with each of the
# extensions specified in PATHEXT
if not '.' in exe:
candidates.extend(exe + ext
for ext in getenv('PATHEXT', '').split(os.pathsep)
if ext.startswith('.'))
# search the current directory first
for candidate in candidates:
if exists(candidate):
return candidate
# if the argument does not include a path separator, search each of the
# directories on the PATH
if not os.path.dirname(exe):
for path in getenv('PATH').split(os.pathsep):
if path:
for candidate in candidates:
full_path = os.path.join(path, candidate)
if exists(full_path):
return full_path
# not found, punt and return the argument unchanged
return exe
# Portability wrappers
if sys.platform == 'win32' or sys.platform == 'cygwin':
fork = _fork_win32
else:
fork = _fork_posix
def wrap(action, fn, decorator=None):
"""Wrap arguments with `action`, optionally decorate the result"""
if decorator is None:
decorator = lambda x: x
def wrapped(*args, **kwargs):
return decorator(fn(action(*args, **kwargs)))
return wrapped
def decorate(decorator, fn):
"""Decorate the result of `fn` with `action`"""
def decorated(*args, **kwargs):
return decorator(fn(*args, **kwargs))
return decorated
def exists(path, encoding=None):
return os.path.exists(encode(path), encoding=encoding)
def getenv(name, default=None):
return decode(os.getenv(name, default))
def xopen(path, mode='r', encoding=None):
return open(encode(path, encoding=encoding), mode)
def stdout(msg):
sys.stdout.write(encode(msg) + '\n')
def stderr(msg):
sys.stderr.write(encode(msg) + '\n')
@interruptable
def node():
return platform.node()
abspath = wrap(encode, os.path.abspath, decorator=decode)
chdir = wrap(encode, os.chdir)
exists = wrap(encode, os.path.exists)
expanduser = wrap(encode, os.path.expanduser, decorator=decode)
getcwd = decorate(decode, os.getcwd)
isdir = wrap(encode, os.path.isdir)
isfile = wrap(encode, os.path.isfile)
islink = wrap(encode, os.path.islink)
makedirs = wrap(encode, os.makedirs)
try:
readlink = wrap(encode, os.readlink, decorator=decode)
except AttributeError:
readlink = lambda p: p
realpath = wrap(encode, os.path.realpath, decorator=decode)
stat = wrap(encode, os.stat)
unlink = wrap(encode, os.unlink)
walk = wrap(encode, os.walk)