Skip to content

Commit

Permalink
Changes from code review.
Browse files Browse the repository at this point in the history
  • Loading branch information
atollk committed Sep 5, 2021
1 parent 3d127b5 commit 6f2fa13
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 58 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

### Added

- To `fs.walk.Walker`, added parameters `filter_glob` and `exclude_glob`.
- Added `filter_glob` and `exclude_glob` parameters to `fs.walk.Walker`.
Closes [#459](https://github.com/PyFilesystem/pyfilesystem2/issues/459).
- Added `fs.copy.copy_file_if`, `fs.copy.copy_dir_if`, and `fs.copy.copy_fs_if`.
Closes [#458](https://github.com/PyFilesystem/pyfilesystem2/issues/458).
Expand Down
8 changes: 5 additions & 3 deletions fs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1648,8 +1648,8 @@ def check(self):
if self.isclosed():
raise errors.FilesystemClosed()

def match(self, patterns, name, accept_prefix=False):
# type: (Optional[Iterable[Text]], Text, bool) -> bool
def match(self, patterns, name):
# type: (Optional[Iterable[Text]], Text) -> bool
"""Check if a name matches any of a list of wildcards.
If a filesystem is case *insensitive* (such as Windows) then
Expand Down Expand Up @@ -1706,7 +1706,7 @@ def match_glob(self, patterns, path, accept_prefix=False):
``['*.py']``, or `None` to match everything.
path (str): A resource path, starting with "/".
accept_prefix (bool): If ``True``, the path is
not required to match the wildcards themselves
not required to match the patterns themselves
but only need to be a prefix of a string that does.
Returns:
Expand All @@ -1724,6 +1724,8 @@ def match_glob(self, patterns, path, accept_prefix=False):
False
>>> my_fs.match_glob(['dir/file.txt'], '/dir/', accept_prefix=True)
True
>>> my_fs.match_glob(['dir/file.txt'], '/dir/', accept_prefix=False)
False
>>> my_fs.match_glob(['dir/file.txt'], '/dir/gile.txt', accept_prefix=True)
False
Expand Down
95 changes: 51 additions & 44 deletions fs/glob.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@

_PATTERN_CACHE = LRUCache(
1000
) # type: LRUCache[Tuple[Text, bool], Tuple[int, bool, Pattern]]
) # type: LRUCache[Tuple[Text, bool], Tuple[Optional[int], Pattern]]


def _split_pattern_by_rec(pattern):
def _split_pattern_by_sep(pattern):
# type: (Text) -> List[Text]
"""Split a glob pattern at its directory seperators (/).
Expand All @@ -56,28 +56,27 @@ def _split_pattern_by_rec(pattern):
return [pattern[i + 1 : j] for i, j in zip(indices[:-1], indices[1:])]


def _translate(pattern, case_sensitive=True):
# type: (Text, bool) -> Text
"""Translate a wildcard pattern to a regular expression.
def _translate(pattern):
# type: (Text) -> Text
"""Translate a glob pattern without '**' to a regular expression.
There is no way to quote meta-characters.
Arguments:
pattern (str): A wildcard pattern.
case_sensitive (bool): Set to `False` to use a case
insensitive regex (default `True`).
pattern (str): A glob pattern.
Returns:
str: A regex equivalent to the given pattern.
"""
if not case_sensitive:
pattern = pattern.lower()
i, n = 0, len(pattern)
res = []
while i < n:
c = pattern[i]
i = i + 1
if c == "*":
if i < n and pattern[i] == "*":
raise ValueError("glob._translate does not support '**' patterns.")
res.append("[^/]*")
elif c == "?":
res.append("[^/]")
Expand All @@ -95,7 +94,7 @@ def _translate(pattern, case_sensitive=True):
stuff = pattern[i:j].replace("\\", "\\\\")
i = j + 1
if stuff[0] == "!":
stuff = "^" + stuff[1:]
stuff = "^/" + stuff[1:]
elif stuff[0] == "^":
stuff = "\\" + stuff
res.append("[%s]" % stuff)
Expand All @@ -104,27 +103,35 @@ def _translate(pattern, case_sensitive=True):
return "".join(res)


def _translate_glob(pattern, case_sensitive=True):
levels = 0
def _translate_glob(pattern):
# type: (Text) -> Tuple[Optional[int], Text]
"""Translate a glob pattern to a regular expression.
There is no way to quote meta-characters.
Arguments:
pattern (str): A glob pattern.
Returns:
Tuple[Optional[int], Text]: The first component describes the levels
of depth this glob pattern goes to; basically the number of "/" in
the pattern. If there is a "**" in the glob pattern, the depth is
basically unbounded, and this component is `None` instead.
The second component is the regular expression.
"""
recursive = False
re_patterns = [""]
for component in iteratepath(pattern):
if "**" in component:
recursive = True
split = component.split("**")
split_re = [_translate(s, case_sensitive=case_sensitive) for s in split]
split_re = [_translate(s) for s in split]
re_patterns.append("/?" + ".*/?".join(split_re))
else:
re_patterns.append(
"/" + _translate(component, case_sensitive=case_sensitive)
)
levels += 1
re_patterns.append("/" + _translate(component))
re_glob = "(?ms)^" + "".join(re_patterns) + ("/$" if pattern.endswith("/") else "$")
return (
levels,
recursive,
re.compile(re_glob, 0 if case_sensitive else re.IGNORECASE),
)
return pattern.count("/") + 1 if not recursive else None, re_glob


def match(pattern, path):
Expand All @@ -146,10 +153,11 @@ def match(pattern, path):
"""
try:
levels, recursive, re_pattern = _PATTERN_CACHE[(pattern, True)]
levels, re_pattern = _PATTERN_CACHE[(pattern, True)]
except KeyError:
levels, recursive, re_pattern = _translate_glob(pattern, case_sensitive=True)
_PATTERN_CACHE[(pattern, True)] = (levels, recursive, re_pattern)
levels, re_str = _translate_glob(pattern)
re_pattern = re.compile(re_str)
_PATTERN_CACHE[(pattern, True)] = (levels, re_pattern)
if path and path[0] != "/":
path = "/" + path
return bool(re_pattern.match(path))
Expand All @@ -168,10 +176,11 @@ def imatch(pattern, path):
"""
try:
levels, recursive, re_pattern = _PATTERN_CACHE[(pattern, False)]
levels, re_pattern = _PATTERN_CACHE[(pattern, False)]
except KeyError:
levels, recursive, re_pattern = _translate_glob(pattern, case_sensitive=True)
_PATTERN_CACHE[(pattern, False)] = (levels, recursive, re_pattern)
levels, re_str = _translate_glob(pattern)
re_pattern = re.compile(re_str, re.IGNORECASE)
_PATTERN_CACHE[(pattern, False)] = (levels, re_pattern)
if path and path[0] != "/":
path = "/" + path
return bool(re_pattern.match(path))
Expand All @@ -186,7 +195,7 @@ def match_any(patterns, path):
Arguments:
patterns (list): A list of wildcard pattern, e.g ``["*.py",
"*.pyc"]``
name (str): A filename.
path (str): A resource path.
Returns:
bool: `True` if the path matches at least one of the patterns.
Expand All @@ -206,7 +215,7 @@ def imatch_any(patterns, path):
Arguments:
patterns (list): A list of wildcard pattern, e.g ``["*.py",
"*.pyc"]``
name (str): A filename.
path (str): A resource path.
Returns:
bool: `True` if the path matches at least one of the patterns.
Expand All @@ -227,29 +236,30 @@ def get_matcher(patterns, case_sensitive, accept_prefix=False):
case_sensitive (bool): If ``True``, then the callable will be case
sensitive, otherwise it will be case insensitive.
accept_prefix (bool): If ``True``, the name is
not required to match the wildcards themselves
not required to match the patterns themselves
but only need to be a prefix of a string that does.
Returns:
callable: a matcher that will return `True` if the paths given as
an argument matches any of the given patterns.
an argument matches any of the given patterns, or if no patterns
exist.
Example:
>>> from fs import wildcard
>>> is_python = wildcard.get_matcher(['*.py'], True)
>>> from fs import glob
>>> is_python = glob.get_matcher(['*.py'], True)
>>> is_python('__init__.py')
True
>>> is_python('foo.txt')
False
"""
if not patterns:
return lambda name: True
return lambda path: True

if accept_prefix:
new_patterns = []
for pattern in patterns:
split = _split_pattern_by_rec(pattern)
split = _split_pattern_by_sep(pattern)
for i in range(1, len(split)):
new_pattern = "/".join(split[:i])
new_patterns.append(new_pattern)
Expand Down Expand Up @@ -309,18 +319,15 @@ def __repr__(self):
def _make_iter(self, search="breadth", namespaces=None):
# type: (str, List[str]) -> Iterator[GlobMatch]
try:
levels, recursive, re_pattern = _PATTERN_CACHE[
(self.pattern, self.case_sensitive)
]
levels, re_pattern = _PATTERN_CACHE[(self.pattern, self.case_sensitive)]
except KeyError:
levels, recursive, re_pattern = _translate_glob(
self.pattern, case_sensitive=self.case_sensitive
)
levels, re_str = _translate_glob(self.pattern)
re_pattern = re.compile(re_str, 0 if self.case_sensitive else re.IGNORECASE)

for path, info in self.fs.walk.info(
path=self.path,
namespaces=namespaces or self.namespaces,
max_depth=None if recursive else levels,
max_depth=levels,
search=search,
exclude_dirs=self.exclude_dirs,
):
Expand Down
8 changes: 4 additions & 4 deletions fs/walk.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,21 +86,21 @@ def __init__(
a list of filename patterns, e.g. ``["~*"]``. Files matching
any of these patterns will be removed from the walk.
filter_dirs (list, optional): A list of patterns that will be used
to match directories paths. The walk will only open directories
to match directories names. The walk will only open directories
that match at least one of these patterns. Directories will
only be returned if the final component matches one of the
patterns.
exclude_dirs (list, optional): A list of patterns that will be
used to filter out directories from the walk. e.g.
``['*.svn', '*.git']``. Directories matching any of these
``['*.svn', '*.git']``. Directory names matching any of these
patterns will be removed from the walk.
max_depth (int, optional): Maximum directory depth to walk.
filter_glob (list, optional): If supplied, this parameter
should be a list of path patterns e.g. ``["foo/**/*.py"]``.
Resources will only be returned if their global path or
an extension of it matches one of the patterns.
exclude_glob (list, optional): If supplied, this parameter
should be a list of path patterns e.g. ``["foo/**/*.py"]``.
should be a list of path patterns e.g. ``["foo/**/*.pyc"]``.
Resources will not be returned if their global path or
an extension of it matches one of the patterns.
Expand Down Expand Up @@ -215,7 +215,7 @@ def _iter_walk(
def _check_open_dir(self, fs, path, info):
# type: (FS, Text, Info) -> bool
"""Check if a directory should be considered in the walk."""
full_path = ("" if path == "/" else path) + "/" + info.name
full_path = combine(path, info.name)
if self.exclude_dirs is not None and fs.match(self.exclude_dirs, info.name):
return False
if self.exclude_glob is not None and fs.match_glob(
Expand Down
61 changes: 55 additions & 6 deletions tests/test_glob.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from __future__ import unicode_literals

import re
import unittest

from parameterized import parameterized

from fs import glob
from fs import open_fs

Expand All @@ -18,8 +21,8 @@ def setUp(self):
fs.makedirs("a/b/c/").writetext("foo.py", "import fs")
repr(fs.glob)

def test_match(self):
tests = [
@parameterized.expand(
[
("*.?y", "/test.py", True),
("*.py", "/test.py", True),
("*.py", "__init__.py", True),
Expand All @@ -42,11 +45,11 @@ def test_match(self):
("**/", "/test/", True),
("**/", "/test.py", False),
]
for pattern, path, expected in tests:
self.assertEqual(glob.match(pattern, path), expected, msg=(pattern, path))
)
def test_match(self, pattern, path, expected):
self.assertEqual(glob.match(pattern, path), expected, msg=(pattern, path))
# Run a second time to test cache
for pattern, path, expected in tests:
self.assertEqual(glob.match(pattern, path), expected, msg=(pattern, path))
self.assertEqual(glob.match(pattern, path), expected, msg=(pattern, path))

def test_count_1dir(self):
globber = glob.BoundGlobber(self.fs)
Expand Down Expand Up @@ -100,3 +103,49 @@ def test_remove_all(self):
globber = glob.BoundGlobber(self.fs)
globber("**").remove()
self.assertEqual(sorted(self.fs.listdir("/")), [])

translate_test_cases = [
("foo.py", ["foo.py"], ["Foo.py", "foo_py", "foo", ".py"]),
("foo?py", ["foo.py", "fooapy"], ["foo/py", "foopy", "fopy"]),
("bar/foo.py", ["bar/foo.py"], []),
("bar?foo.py", ["barafoo.py"], ["bar/foo.py"]),
("???.py", ["foo.py", "bar.py", "FOO.py"], [".py", "foo.PY"]),
("bar/*.py", ["bar/.py", "bar/foo.py"], ["bar/foo"]),
("bar/foo*.py", ["bar/foo.py", "bar/foobaz.py"], ["bar/foo", "bar/.py"]),
("*/[bar]/foo.py", ["/b/foo.py", "x/a/foo.py", "/r/foo.py"], ["b/foo.py", "/bar/foo.py"]),
("[!bar]/foo.py", ["x/foo.py"], ["//foo.py"]),
("[.py", ["[.py"], [".py", "."]),
]

@parameterized.expand(translate_test_cases)
def test_translate(self, glob_pattern, expected_matches, expected_not_matches):
translated = glob._translate(glob_pattern)
for m in expected_matches:
self.assertTrue(re.match(translated, m))
for m in expected_not_matches:
self.assertFalse(re.match(translated, m))

@parameterized.expand(translate_test_cases)
def test_translate_glob_simple(self, glob_pattern, expected_matches, expected_not_matches):
levels, translated = glob._translate_glob(glob_pattern)
self.assertEqual(levels, glob_pattern.count("/") + 1)
for m in expected_matches:
self.assertTrue(re.match(translated, "/" + m))
for m in expected_not_matches:
self.assertFalse(re.match(translated, m))
self.assertFalse(re.match(translated, "/" + m))

@parameterized.expand(
[
("foo/**/bar", ["/foo/bar", "/foo/baz/bar", "/foo/baz/qux/bar"], ["/foo"]),
("**/*/bar", ["/foo/bar", "/foo/bar"], ["/bar", "/bar"]),
("/**/foo/**/bar", ["/baz/foo/qux/bar", "/foo/bar"], ["/bar"]),
]
)
def test_translate_glob(self, glob_pattern, expected_matches, expected_not_matches):
levels, translated = glob._translate_glob(glob_pattern)
self.assertIsNone(levels)
for m in expected_matches:
self.assertTrue(re.match(translated, m))
for m in expected_not_matches:
self.assertFalse(re.match(translated, m))

0 comments on commit 6f2fa13

Please sign in to comment.