Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,20 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).

## [2.0.16] - 2017-11-11

### Added

- fs.parts

### Fixed

- Walk now yields Step named tuples as advertised

### Added

- Added max_depth parameter to fs.walk

## [2.0.15] - 2017-11-05

### Changed
Expand Down
2 changes: 1 addition & 1 deletion fs/_version.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""Version, used in module and setup.py.
"""
__version__ = "2.0.15"
__version__ = "2.0.16"
24 changes: 24 additions & 0 deletions fs/path.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"iteratepath",
"join",
"normpath",
"parts",
"recursepath",
"relativefrom",
"relpath",
Expand Down Expand Up @@ -256,6 +257,29 @@ def combine(path1, path2):
return "{}/{}".format(path1.rstrip('/'), path2.lstrip('/'))


def parts(path):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"""Split a path in to its component parts.

Arguments:
path (str): Path to split in to parts.

Returns:
list: List of components

Example:
>>> parts('/foo/bar/baz')
['/', 'foo', 'bar', 'baz']

"""
_path = normpath(path)
components = _path.strip('/')

_parts = ['/' if _path.startswith('/') else './']
if components:
_parts += components.split('/')
return _parts


def split(path):
"""Split a path into (head, tail) pair.

Expand Down
85 changes: 69 additions & 16 deletions fs/walk.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ class Walker(WalkerBase):
be returned if the final component matches one of the patterns.
exclude_dirs (list, optional): A list of patterns that will be used
to filter out directories from the walk. e.g. ``['*.svn', '*.git']``.
max_depth (int, optional): Maximum directory depth to walk.

"""

Expand All @@ -131,7 +132,8 @@ def __init__(self,
on_error=None,
search="breadth",
filter=None,
exclude_dirs=None):
exclude_dirs=None,
max_depth=None):
if search not in ('breadth', 'depth'):
raise ValueError("search must be 'breadth' or 'depth'")
self.ignore_errors = ignore_errors
Expand All @@ -153,6 +155,7 @@ def __init__(self,
self.search = search
self.filter = filter
self.exclude_dirs = exclude_dirs
self.max_depth = max_depth
super(Walker, self).__init__()

@classmethod
Expand All @@ -165,6 +168,14 @@ def _raise_errors(cls, path, error):
"""Callback to re-raise dir scan errors."""
return False

@classmethod
def _calculate_depth(cls, path):
"""Calculate the 'depth' of a directory path (number of
components).
"""
_path = path.strip('/')
return _path.count('/') + 1 if _path else 0

@classmethod
def bind(cls, fs):
"""Bind a `Walker` instance to a given filesystem.
Expand Down Expand Up @@ -208,7 +219,8 @@ def __repr__(self):
on_error=(self.on_error, None),
search=(self.search, 'breadth'),
filter=(self.filter, None),
exclude_dirs=(self.exclude_dirs, None)
exclude_dirs=(self.exclude_dirs, None),
max_depth=(self.max_depth, None)
)

def filter_files(self, fs, infos):
Expand All @@ -232,23 +244,53 @@ def filter_files(self, fs, infos):
if _check_file(fs, info)
]

def _check_open_dir(self, fs, path, info):
"""Check if a directory should be considered in the walk.
"""
if (self.exclude_dirs is not None and
fs.match(self.exclude_dirs, info.name)):
return False
return self.check_open_dir(fs, path, info)

def check_open_dir(self, fs, info):
def check_open_dir(self, fs, path, info):
"""Check if a directory should be opened.

Override to exclude directories from the walk.

Arguments:
fs (FS): A filesystem instance.
info (Info): A resource info object.
path (str): Path to directory.
info (Info): A resource info object for the directory.

Returns:
bool: `True` if the directory should be opened.

"""
if self.exclude_dirs is None:
return True
return not fs.match(self.exclude_dirs, info.name)
return True

def _check_scan_dir(self, fs, path, info, depth):
"""Check if a directory contents should be scanned."""
if self.max_depth is not None and depth >= self.max_depth:
return False
return self.check_scan_dir(fs, path, info)

def check_scan_dir(self, fs, path, info):
"""Check if a directory should be scanned.

Override to omit scanning of certain directories. If a directory
is omitted, it will appear in the walk but its files and
sub-directories will not.

Arguments:
fs (FS): A filesystem instance.
path (str): Path to directory.
info (Info): A resource info object for the directory.

Returns:
bool: `True` if the directory should be scanned.

"""
return True

def check_file(self, fs, info):
"""Check if a filename should be included.
Expand Down Expand Up @@ -329,19 +371,22 @@ def _walk_breadth(self, fs, path, namespaces=None):
queue = deque([path])
push = queue.appendleft
pop = queue.pop
depth = self._calculate_depth(path)

while queue:
dir_path = pop()
dirs = []
files = []
for info in self._scan(fs, dir_path, namespaces=namespaces):
if info.is_dir:
if self.check_open_dir(fs, info):
_depth = self._calculate_depth(dir_path) - depth + 1
if self._check_open_dir(fs, dir_path, info):
dirs.append(info)
push(join(dir_path, info.name))
if self._check_scan_dir(fs, dir_path, info, _depth):
push(join(dir_path, info.name))
else:
files.append(info)
yield (
yield Step(
dir_path,
dirs,
self.filter_files(fs, files)
Expand All @@ -353,8 +398,10 @@ def _walk_depth(self, fs, path, namespaces=None):
# No recursion!

def scan(path):
"""Perform scan."""
return self._scan(fs, path, namespaces=namespaces)

depth = self._calculate_depth(path)
stack = [(
path, scan(path), [], []
)]
Expand All @@ -365,20 +412,22 @@ def scan(path):
try:
info = next(iter_files)
except StopIteration:
yield (
yield Step(
dir_path,
dirs,
self.filter_files(fs, files)
)
del stack[-1]
else:
if info.is_dir:
if self.check_open_dir(fs, info):
_depth = self._calculate_depth(dir_path) - depth + 1
if self._check_open_dir(fs, dir_path, info):
dirs.append(info)
_path = join(dir_path, info.name)
push((
_path, scan(_path), [], []
))
if self._check_scan_dir(fs, dir_path, info, _depth):
_path = join(dir_path, info.name)
push((
_path, scan(_path), [], []
))
else:
files.append(info)

Expand Down Expand Up @@ -448,6 +497,7 @@ def walk(self,
exclude_dirs (list): A list of patterns that will be used
to filter out directories from the walk, e.g. ``['*.svn',
'*.git']``.
max_depth (int, optional): Maximum directory depth to walk.

Returns:
~collections.Iterator: an iterator of ``(<path>, <dirs>, <files>)``
Expand Down Expand Up @@ -495,6 +545,7 @@ def files(self, path='/', **kwargs):
exclude_dirs (list): A list of patterns that will be used
to filter out directories from the walk, e.g. ``['*.svn',
'*.git']``.
max_depth (int, optional): Maximum directory depth to walk.

Returns:
~collections.Iterable: An iterable of file paths (absolute
Expand Down Expand Up @@ -525,6 +576,7 @@ def dirs(self, path='/', **kwargs):
exclude_dirs (list): A list of patterns that will be used
to filter out directories from the walk, e.g. ``['*.svn',
'*.git']``.
max_depth (int, optional): Maximum directory depth to walk.

Returns:
~collections.iterable: an iterable of directory paths
Expand Down Expand Up @@ -562,6 +614,7 @@ def info(self, path='/', namespaces=None, **kwargs):
exclude_dirs (list): A list of patterns that will be used
to filter out directories from the walk, e.g. ``['*.svn',
'*.git']``.
max_depth (int, optional): Maximum directory depth to walk.

Returns:
~collections.Iterable: an iterable yielding tuples of
Expand Down
1 change: 0 additions & 1 deletion fs/zipfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from __future__ import unicode_literals

import zipfile
import stat

from datetime import datetime

Expand Down
8 changes: 8 additions & 0 deletions tests/test_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,14 @@ def test_combine(self):
self.assertEqual(combine('', 'bar'), 'bar')
self.assertEqual(combine('foo', 'bar'), 'foo/bar')

def test_parts(self):
self.assertEqual(parts('/'), ['/'])
self.assertEqual(parts(''), ['./'])
self.assertEqual(parts('/foo'), ['/', 'foo'])
self.assertEqual(parts('/foo/bar'), ['/', 'foo', 'bar'])
self.assertEqual(parts('/foo/bar/'), ['/', 'foo', 'bar'])
self.assertEqual(parts('./foo/bar/'), ['./', 'foo', 'bar'])

def test_pathsplit(self):
tests = [
("a/b", ("a", "b")),
Expand Down
44 changes: 40 additions & 4 deletions tests/test_walk.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,51 @@ def test_repr(self):
repr(self.fs.walk)

def test_walk(self):
walk = []
for path, dirs, files in self.fs.walk():
walk.append((
_walk = []
for step in self.fs.walk():
self.assertIsInstance(step, walk.Step)
path, dirs, files = step
_walk.append((
path,
[info.name for info in dirs],
[info.name for info in files]
))
expected = [(u'/', [u'foo1', u'foo2', u'foo3'], []), (u'/foo1', [u'bar1'], [u'top1.txt', u'top2.txt']), (u'/foo2', [u'bar2'], [u'top3.txt']), (u'/foo3', [], []), (u'/foo1/bar1', [], []), (u'/foo2/bar2', [u'bar3'], []), (u'/foo2/bar2/bar3', [], [u'test.txt'])]
self.assertEqual(walk, expected)
self.assertEqual(_walk, expected)

def test_walk_directory(self):
_walk = []
for step in self.fs.walk('foo2'):
self.assertIsInstance(step, walk.Step)
path, dirs, files = step
_walk.append((
path,
[info.name for info in dirs],
[info.name for info in files]
))
expected = [(u'/foo2', [u'bar2'], [u'top3.txt']), (u'/foo2/bar2', [u'bar3'], []), (u'/foo2/bar2/bar3', [], [u'test.txt'])]
self.assertEqual(_walk, expected)

def test_walk_levels_1(self):
results = list(self.fs.walk(max_depth=1))
self.assertEqual(len(results), 1)
dirs = sorted(info.name for info in results[0].dirs)
self.assertEqual(dirs, ['foo1', 'foo2', 'foo3'])
files = sorted(info.name for info in results[0].files)
self.assertEqual(files, [])

def test_walk_levels_2(self):
_walk = []
for step in self.fs.walk(max_depth=2):
self.assertIsInstance(step, walk.Step)
path, dirs, files = step
_walk.append((
path,
sorted(info.name for info in dirs),
sorted(info.name for info in files)
))
expected = [(u'/', [u'foo1', u'foo2', u'foo3'], []), (u'/foo1', [u'bar1'], [u'top1.txt', u'top2.txt']), (u'/foo2', [u'bar2'], [u'top3.txt']), (u'/foo3', [], [])]
self.assertEqual(_walk, expected)

def test_walk_files(self):
files = list(self.fs.walk.files())
Expand Down