/
util.py
600 lines (461 loc) · 17.5 KB
/
util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
# encoding: utf-8
"""
This module provides utility methods for dealing with path-specs.
"""
import os
import os.path
import posixpath
import stat
from .compat import Collection, Iterable, string_types, unicode
NORMALIZE_PATH_SEPS = [sep for sep in [os.sep, os.altsep] if sep and sep != posixpath.sep]
"""
*NORMALIZE_PATH_SEPS* (:class:`list` of :class:`str`) contains the path
separators that need to be normalized to the POSIX separator for the
current operating system. The separators are determined by examining
:data:`os.sep` and :data:`os.altsep`.
"""
_registered_patterns = {}
"""
*_registered_patterns* (:class:`dict`) maps a name (:class:`str`) to the
registered pattern factory (:class:`~collections.abc.Callable`).
"""
def detailed_match_files(patterns, files, all_matches=None):
"""
Matches the files to the patterns, and returns which patterns matched
the files.
*patterns* (:class:`~collections.abc.Iterable` of :class:`~pathspec.pattern.Pattern`)
contains the patterns to use.
*files* (:class:`~collections.abc.Iterable` of :class:`str`) contains
the normalized file paths to be matched against *patterns*.
*all_matches* (:class:`boot` or :data:`None`) is whether to return all
matches patterns (:data:`True`), or only the last matched pattern
(:data:`False`). Default is :data:`None` for :data:`False`.
Returns the matched files (:class:`dict`) which maps each matched file
(:class:`str`) to the patterns that matched in order (:class:`.MatchDetail`).
"""
all_files = files if isinstance(files, Collection) else list(files)
return_files = {}
for pattern in patterns:
if pattern.include is not None:
result_files = pattern.match(all_files)
if pattern.include:
# Add files and record pattern.
for result_file in result_files:
if result_file in return_files:
if all_matches:
return_files[result_file].patterns.append(pattern)
else:
return_files[result_file].patterns[0] = pattern
else:
return_files[result_file] = MatchDetail([pattern])
else:
# Remove files.
for file in result_files:
del return_files[file]
return return_files
def _is_iterable(value):
"""
Check whether the value is an iterable (excludes strings).
*value* is the value to check,
Returns whether *value* is a iterable (:class:`bool`).
"""
return isinstance(value, Iterable) and not isinstance(value, (unicode, bytes))
def iter_tree_entries(root, on_error=None, follow_links=None):
"""
Walks the specified directory for all files and directories.
*root* (:class:`str`) is the root directory to search.
*on_error* (:class:`~collections.abc.Callable` or :data:`None`)
optionally is the error handler for file-system exceptions. It will be
called with the exception (:exc:`OSError`). Reraise the exception to
abort the walk. Default is :data:`None` to ignore file-system
exceptions.
*follow_links* (:class:`bool` or :data:`None`) optionally is whether
to walk symbolic links that resolve to directories. Default is
:data:`None` for :data:`True`.
Raises :exc:`RecursionError` if recursion is detected.
Returns an :class:`~collections.abc.Iterable` yielding each file or
directory entry (:class:`.TreeEntry`) relative to *root*.
"""
if on_error is not None and not callable(on_error):
raise TypeError("on_error:{!r} is not callable.".format(on_error))
if follow_links is None:
follow_links = True
for entry in _iter_tree_entries_next(os.path.abspath(root), '', {}, on_error, follow_links):
yield entry
def iter_tree_files(root, on_error=None, follow_links=None):
"""
Walks the specified directory for all files.
*root* (:class:`str`) is the root directory to search for files.
*on_error* (:class:`~collections.abc.Callable` or :data:`None`)
optionally is the error handler for file-system exceptions. It will be
called with the exception (:exc:`OSError`). Reraise the exception to
abort the walk. Default is :data:`None` to ignore file-system
exceptions.
*follow_links* (:class:`bool` or :data:`None`) optionally is whether
to walk symbolic links that resolve to directories. Default is
:data:`None` for :data:`True`.
Raises :exc:`RecursionError` if recursion is detected.
Returns an :class:`~collections.abc.Iterable` yielding the path to
each file (:class:`str`) relative to *root*.
"""
if on_error is not None and not callable(on_error):
raise TypeError("on_error:{!r} is not callable.".format(on_error))
if follow_links is None:
follow_links = True
for entry in _iter_tree_entries_next(os.path.abspath(root), '', {}, on_error, follow_links):
if not entry.is_dir(follow_links):
yield entry.path
# Alias `iter_tree_files()` as `iter_tree()`.
iter_tree = iter_tree_files
def _iter_tree_entries_next(root_full, dir_rel, memo, on_error, follow_links):
"""
Scan the directory for all descendant files.
*root_full* (:class:`str`) the absolute path to the root directory.
*dir_rel* (:class:`str`) the path to the directory to scan relative to
*root_full*.
*memo* (:class:`dict`) keeps track of ancestor directories
encountered. Maps each ancestor real path (:class:`str`) to relative
path (:class:`str`).
*on_error* (:class:`~collections.abc.Callable` or :data:`None`)
optionally is the error handler for file-system exceptions.
*follow_links* (:class:`bool`) is whether to walk symbolic links that
resolve to directories.
Yields each entry (:class:`.TreeEntry`).
"""
dir_full = os.path.join(root_full, dir_rel)
dir_real = os.path.realpath(dir_full)
# Remember each encountered ancestor directory and its canonical
# (real) path. If a canonical path is encountered more than once,
# recursion has occurred.
if dir_real not in memo:
memo[dir_real] = dir_rel
else:
raise RecursionError(real_path=dir_real, first_path=memo[dir_real], second_path=dir_rel)
for node_name in os.listdir(dir_full):
node_rel = os.path.join(dir_rel, node_name)
node_full = os.path.join(root_full, node_rel)
# Inspect child node.
try:
node_lstat = os.lstat(node_full)
except OSError as e:
if on_error is not None:
on_error(e)
continue
if stat.S_ISLNK(node_lstat.st_mode):
# Child node is a link, inspect the target node.
is_link = True
try:
node_stat = os.stat(node_full)
except OSError as e:
if on_error is not None:
on_error(e)
continue
else:
is_link = False
node_stat = node_lstat
if stat.S_ISDIR(node_stat.st_mode) and (follow_links or not is_link):
# Child node is a directory, recurse into it and yield its
# descendant files.
yield TreeEntry(node_name, node_rel, node_lstat, node_stat)
for entry in _iter_tree_entries_next(root_full, node_rel, memo, on_error, follow_links):
yield entry
elif stat.S_ISREG(node_stat.st_mode) or is_link:
# Child node is either a file or an unfollowed link, yield it.
yield TreeEntry(node_name, node_rel, node_lstat, node_stat)
# NOTE: Make sure to remove the canonical (real) path of the directory
# from the ancestors memo once we are done with it. This allows the
# same directory to appear multiple times. If this is not done, the
# second occurrence of the directory will be incorrectly interpreted
# as a recursion. See <https://github.com/cpburnz/python-path-specification/pull/7>.
del memo[dir_real]
def lookup_pattern(name):
"""
Lookups a registered pattern factory by name.
*name* (:class:`str`) is the name of the pattern factory.
Returns the registered pattern factory (:class:`~collections.abc.Callable`).
If no pattern factory is registered, raises :exc:`KeyError`.
"""
return _registered_patterns[name]
def match_file(patterns, file):
"""
Matches the file to the patterns.
*patterns* (:class:`~collections.abc.Iterable` of :class:`~pathspec.pattern.Pattern`)
contains the patterns to use.
*file* (:class:`str`) is the normalized file path to be matched
against *patterns*.
Returns :data:`True` if *file* matched; otherwise, :data:`False`.
"""
matched = False
for pattern in patterns:
if pattern.include is not None:
if file in pattern.match((file,)):
matched = pattern.include
return matched
def match_files(patterns, files):
"""
Matches the files to the patterns.
*patterns* (:class:`~collections.abc.Iterable` of :class:`~pathspec.pattern.Pattern`)
contains the patterns to use.
*files* (:class:`~collections.abc.Iterable` of :class:`str`) contains
the normalized file paths to be matched against *patterns*.
Returns the matched files (:class:`set` of :class:`str`).
"""
all_files = files if isinstance(files, Collection) else list(files)
return_files = set()
for pattern in patterns:
if pattern.include is not None:
result_files = pattern.match(all_files)
if pattern.include:
return_files.update(result_files)
else:
return_files.difference_update(result_files)
return return_files
def _normalize_entries(entries, separators=None):
"""
Normalizes the entry paths to use the POSIX path separator.
*entries* (:class:`~collections.abc.Iterable` of :class:`.TreeEntry`)
contains the entries to be normalized.
*separators* (:class:`~collections.abc.Collection` of :class:`str`; or
:data:`None`) optionally contains the path separators to normalize.
See :func:`normalize_file` for more information.
Returns a :class:`dict` mapping the each normalized file path (:class:`str`)
to the entry (:class:`.TreeEntry`)
"""
norm_files = {}
for entry in entries:
norm_files[normalize_file(entry.path, separators=separators)] = entry
return norm_files
def normalize_file(file, separators=None):
"""
Normalizes the file path to use the POSIX path separator (i.e., ``'/'``).
*file* (:class:`str` or :class:`pathlib.PurePath`) is the file path.
*separators* (:class:`~collections.abc.Collection` of :class:`str`; or
:data:`None`) optionally contains the path separators to normalize.
This does not need to include the POSIX path separator (``'/'``), but
including it will not affect the results. Default is :data:`None` for
:data:`NORMALIZE_PATH_SEPS`. To prevent normalization, pass an empty
container (e.g., an empty tuple ``()``).
Returns the normalized file path (:class:`str`).
"""
# Normalize path separators.
if separators is None:
separators = NORMALIZE_PATH_SEPS
# Convert path object to string.
norm_file = str(file)
for sep in separators:
norm_file = norm_file.replace(sep, posixpath.sep)
# Remove current directory prefix.
if norm_file.startswith('./'):
norm_file = norm_file[2:]
return norm_file
def normalize_files(files, separators=None):
"""
Normalizes the file paths to use the POSIX path separator.
*files* (:class:`~collections.abc.Iterable` of :class:`str` or
:class:`pathlib.PurePath`) contains the file paths to be normalized.
*separators* (:class:`~collections.abc.Collection` of :class:`str`; or
:data:`None`) optionally contains the path separators to normalize.
See :func:`normalize_file` for more information.
Returns a :class:`dict` mapping the each normalized file path (:class:`str`)
to the original file path (:class:`str`)
"""
norm_files = {}
for path in files:
norm_files[normalize_file(path, separators=separators)] = path
return norm_files
def register_pattern(name, pattern_factory, override=None):
"""
Registers the specified pattern factory.
*name* (:class:`str`) is the name to register the pattern factory
under.
*pattern_factory* (:class:`~collections.abc.Callable`) is used to
compile patterns. It must accept an uncompiled pattern (:class:`str`)
and return the compiled pattern (:class:`.Pattern`).
*override* (:class:`bool` or :data:`None`) optionally is whether to
allow overriding an already registered pattern under the same name
(:data:`True`), instead of raising an :exc:`AlreadyRegisteredError`
(:data:`False`). Default is :data:`None` for :data:`False`.
"""
if not isinstance(name, string_types):
raise TypeError("name:{!r} is not a string.".format(name))
if not callable(pattern_factory):
raise TypeError("pattern_factory:{!r} is not callable.".format(pattern_factory))
if name in _registered_patterns and not override:
raise AlreadyRegisteredError(name, _registered_patterns[name])
_registered_patterns[name] = pattern_factory
class AlreadyRegisteredError(Exception):
"""
The :exc:`AlreadyRegisteredError` exception is raised when a pattern
factory is registered under a name already in use.
"""
def __init__(self, name, pattern_factory):
"""
Initializes the :exc:`AlreadyRegisteredError` instance.
*name* (:class:`str`) is the name of the registered pattern.
*pattern_factory* (:class:`~collections.abc.Callable`) is the
registered pattern factory.
"""
super(AlreadyRegisteredError, self).__init__(name, pattern_factory)
@property
def message(self):
"""
*message* (:class:`str`) is the error message.
"""
return "{name!r} is already registered for pattern factory:{pattern_factory!r}.".format(
name=self.name,
pattern_factory=self.pattern_factory,
)
@property
def name(self):
"""
*name* (:class:`str`) is the name of the registered pattern.
"""
return self.args[0]
@property
def pattern_factory(self):
"""
*pattern_factory* (:class:`~collections.abc.Callable`) is the
registered pattern factory.
"""
return self.args[1]
class RecursionError(Exception):
"""
The :exc:`RecursionError` exception is raised when recursion is
detected.
"""
def __init__(self, real_path, first_path, second_path):
"""
Initializes the :exc:`RecursionError` instance.
*real_path* (:class:`str`) is the real path that recursion was
encountered on.
*first_path* (:class:`str`) is the first path encountered for
*real_path*.
*second_path* (:class:`str`) is the second path encountered for
*real_path*.
"""
super(RecursionError, self).__init__(real_path, first_path, second_path)
@property
def first_path(self):
"""
*first_path* (:class:`str`) is the first path encountered for
:attr:`self.real_path <RecursionError.real_path>`.
"""
return self.args[1]
@property
def message(self):
"""
*message* (:class:`str`) is the error message.
"""
return "Real path {real!r} was encountered at {first!r} and then {second!r}.".format(
real=self.real_path,
first=self.first_path,
second=self.second_path,
)
@property
def real_path(self):
"""
*real_path* (:class:`str`) is the real path that recursion was
encountered on.
"""
return self.args[0]
@property
def second_path(self):
"""
*second_path* (:class:`str`) is the second path encountered for
:attr:`self.real_path <RecursionError.real_path>`.
"""
return self.args[2]
class MatchDetail(object):
"""
The :class:`.MatchDetail` class contains information about
"""
#: Make the class dict-less.
__slots__ = ('patterns',)
def __init__(self, patterns):
"""
Initialize the :class:`.MatchDetail` instance.
*patterns* (:class:`~collections.abc.Sequence` of :class:`~pathspec.pattern.Pattern`)
contains the patterns that matched the file in the order they were
encountered.
"""
self.patterns = patterns
"""
*patterns* (:class:`~collections.abc.Sequence` of :class:`~pathspec.pattern.Pattern`)
contains the patterns that matched the file in the order they were
encountered.
"""
class TreeEntry(object):
"""
The :class:`.TreeEntry` class contains information about a file-system
entry.
"""
#: Make the class dict-less.
__slots__ = ('_lstat', 'name', 'path', '_stat')
def __init__(self, name, path, lstat, stat):
"""
Initialize the :class:`.TreeEntry` instance.
*name* (:class:`str`) is the base name of the entry.
*path* (:class:`str`) is the relative path of the entry.
*lstat* (:class:`~os.stat_result`) is the stat result of the direct
entry.
*stat* (:class:`~os.stat_result`) is the stat result of the entry,
potentially linked.
"""
self._lstat = lstat
"""
*_lstat* (:class:`~os.stat_result`) is the stat result of the direct
entry.
"""
self.name = name
"""
*name* (:class:`str`) is the base name of the entry.
"""
self.path = path
"""
*path* (:class:`str`) is the path of the entry.
"""
self._stat = stat
"""
*_stat* (:class:`~os.stat_result`) is the stat result of the linked
entry.
"""
def is_dir(self, follow_links=None):
"""
Get whether the entry is a directory.
*follow_links* (:class:`bool` or :data:`None`) is whether to follow
symbolic links. If this is :data:`True`, a symlink to a directory
will result in :data:`True`. Default is :data:`None` for :data:`True`.
Returns whether the entry is a directory (:class:`bool`).
"""
if follow_links is None:
follow_links = True
node_stat = self._stat if follow_links else self._lstat
return stat.S_ISDIR(node_stat.st_mode)
def is_file(self, follow_links=None):
"""
Get whether the entry is a regular file.
*follow_links* (:class:`bool` or :data:`None`) is whether to follow
symbolic links. If this is :data:`True`, a symlink to a regular file
will result in :data:`True`. Default is :data:`None` for :data:`True`.
Returns whether the entry is a regular file (:class:`bool`).
"""
if follow_links is None:
follow_links = True
node_stat = self._stat if follow_links else self._lstat
return stat.S_ISREG(node_stat.st_mode)
def is_symlink(self):
"""
Returns whether the entry is a symbolic link (:class:`bool`).
"""
return stat.S_ISLNK(self._lstat.st_mode)
def stat(self, follow_links=None):
"""
Get the cached stat result for the entry.
*follow_links* (:class:`bool` or :data:`None`) is whether to follow
symbolic links. If this is :data:`True`, the stat result of the
linked file will be returned. Default is :data:`None` for :data:`True`.
Returns that stat result (:class:`~os.stat_result`).
"""
if follow_links is None:
follow_links = True
return self._stat if follow_links else self._lstat