diff --git a/CHANGELOG.md b/CHANGELOG.md index 0b80ff81..c99ffcc3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,11 +20,13 @@ and this project adheres to [Semantic Versioning](http://semver.org/). Closes [#445](https://github.com/PyFilesystem/pyfilesystem2/pull/445). - Migrate continuous integration from Travis-CI to GitHub Actions and introduce several linters again in the build steps ([#448](https://github.com/PyFilesystem/pyfilesystem2/pull/448)). - Closes [#446](https://github.com/PyFilesystem/pyfilesystem2/pull/446). + Closes [#446](https://github.com/PyFilesystem/pyfilesystem2/issues/446). - Stop requiring `pytest` to run tests, allowing any test runner supporting `unittest`-style test suites. - `FSTestCases` now builds the large data required for `upload` and `download` tests only once in order to reduce the total testing time. +- `MemoryFS.move` and `MemoryFS.movedir` will now avoid copying data. + Closes [#452](https://github.com/PyFilesystem/pyfilesystem2/issues/452). ### Fixed diff --git a/fs/memoryfs.py b/fs/memoryfs.py index 30e9c21f..7965a135 100644 --- a/fs/memoryfs.py +++ b/fs/memoryfs.py @@ -38,6 +38,7 @@ SupportsInt, Union, Text, + Tuple, ) from .base import _OpendirFactory from .info import RawInfo @@ -274,6 +275,10 @@ def remove_entry(self, name): # type: (Text) -> None del self._dir[name] + def clear(self): + # type: () -> None + self._dir.clear() + def __contains__(self, name): # type: (object) -> bool return name in self._dir @@ -294,6 +299,21 @@ def remove_open_file(self, memory_file): # type: (_MemoryFile) -> None self._open_files.remove(memory_file) + def to_info(self, namespaces=None): + # type: (Optional[Collection[Text]]) -> Info + namespaces = namespaces or () + info = {"basic": {"name": self.name, "is_dir": self.is_dir}} + if "details" in namespaces: + info["details"] = { + "_write": ["accessed", "modified"], + "type": int(self.resource_type), + "size": self.size, + "accessed": self.accessed_time, + "modified": self.modified_time, + "created": self.created_time, + } + return Info(info) + @six.python_2_unicode_compatible class MemoryFS(FS): @@ -368,33 +388,24 @@ def close(self): def getinfo(self, path, namespaces=None): # type: (Text, Optional[Collection[Text]]) -> Info - namespaces = namespaces or () _path = self.validatepath(path) dir_entry = self._get_dir_entry(_path) if dir_entry is None: raise errors.ResourceNotFound(path) - info = {"basic": {"name": dir_entry.name, "is_dir": dir_entry.is_dir}} - if "details" in namespaces: - info["details"] = { - "_write": ["accessed", "modified"], - "type": int(dir_entry.resource_type), - "size": dir_entry.size, - "accessed": dir_entry.accessed_time, - "modified": dir_entry.modified_time, - "created": dir_entry.created_time, - } - return Info(info) + return dir_entry.to_info(namespaces=namespaces) def listdir(self, path): # type: (Text) -> List[Text] self.check() _path = self.validatepath(path) with self._lock: + # locate and validate the entry corresponding to the given path dir_entry = self._get_dir_entry(_path) if dir_entry is None: raise errors.ResourceNotFound(path) if not dir_entry.is_dir: raise errors.DirectoryExpected(path) + # return the filenames in the order they were created return dir_entry.list() if typing.TYPE_CHECKING: @@ -433,6 +444,46 @@ def makedir( parent_dir.set_entry(dir_name, new_dir) return self.opendir(path) + def move(self, src_path, dst_path, overwrite=False): + src_dir, src_name = split(self.validatepath(src_path)) + dst_dir, dst_name = split(self.validatepath(dst_path)) + + with self._lock: + src_dir_entry = self._get_dir_entry(src_dir) + if src_dir_entry is None or src_name not in src_dir_entry: + raise errors.ResourceNotFound(src_path) + src_entry = src_dir_entry.get_entry(src_name) + if src_entry.is_dir: + raise errors.FileExpected(src_path) + + dst_dir_entry = self._get_dir_entry(dst_dir) + if dst_dir_entry is None: + raise errors.ResourceNotFound(dst_path) + elif not overwrite and dst_name in dst_dir_entry: + raise errors.DestinationExists(dst_path) + + dst_dir_entry.set_entry(dst_name, src_entry) + src_dir_entry.remove_entry(src_name) + + def movedir(self, src_path, dst_path, create=False): + src_dir, src_name = split(self.validatepath(src_path)) + dst_dir, dst_name = split(self.validatepath(dst_path)) + + with self._lock: + src_dir_entry = self._get_dir_entry(src_dir) + if src_dir_entry is None or src_name not in src_dir_entry: + raise errors.ResourceNotFound(src_path) + src_entry = src_dir_entry.get_entry(src_name) + if not src_entry.is_dir: + raise errors.DirectoryExpected(src_path) + + dst_dir_entry = self._get_dir_entry(dst_dir) + if dst_dir_entry is None or (not create and dst_name not in dst_dir_entry): + raise errors.ResourceNotFound(dst_path) + + dst_dir_entry.set_entry(dst_name, src_entry) + src_dir_entry.remove_entry(src_name) + def openbin(self, path, mode="r", buffering=-1, **options): # type: (Text, Text, int, **Any) -> BinaryIO _mode = Mode(mode) @@ -499,12 +550,29 @@ def remove(self, path): def removedir(self, path): # type: (Text) -> None + # make sure we are not removing root _path = self.validatepath(path) - if _path == "/": raise errors.RemoveRootError() + # make sure the directory is empty + if not self.isempty(path): + raise errors.DirectoryNotEmpty(path) + # we can now delegate to removetree since we confirmed that + # * path exists (isempty) + # * path is a folder (isempty) + # * path is not root + self.removetree(_path) + + def removetree(self, path): + # type: (Text) -> None + _path = self.validatepath(path) with self._lock: + + if _path == "/": + self.root.clear() + return + dir_path, file_name = split(_path) parent_dir_entry = self._get_dir_entry(dir_path) @@ -515,11 +583,34 @@ def removedir(self, path): if not dir_dir_entry.is_dir: raise errors.DirectoryExpected(path) - if len(dir_dir_entry): - raise errors.DirectoryNotEmpty(path) - parent_dir_entry.remove_entry(file_name) + def scandir( + self, + path, # type: Text + namespaces=None, # type: Optional[Collection[Text]] + page=None, # type: Optional[Tuple[int, int]] + ): + # type: (...) -> Iterator[Info] + self.check() + _path = self.validatepath(path) + with self._lock: + # locate and validate the entry corresponding to the given path + dir_entry = self._get_dir_entry(_path) + if dir_entry is None: + raise errors.ResourceNotFound(path) + if not dir_entry.is_dir: + raise errors.DirectoryExpected(path) + # if paging was requested, slice the filenames + filenames = dir_entry.list() + if page is not None: + start, end = page + filenames = filenames[start:end] + # yield info with the right namespaces + for name in filenames: + entry = typing.cast(_DirEntry, dir_entry.get_entry(name)) + yield entry.to_info(namespaces=namespaces) + def setinfo(self, path, info): # type: (Text, RawInfo) -> None _path = self.validatepath(path)