-
-
Notifications
You must be signed in to change notification settings - Fork 174
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
4 changed files
with
220 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,6 @@ | ||
""" | ||
fs.base | ||
======== | ||
======= | ||
PyFilesystem base class | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
""" | ||
fs.mirror | ||
========= | ||
Create a duplicate of a filesystem. | ||
Mirroring will create a copy of a source filesystem on a destination | ||
filesystem. If there are no files on the destination, then mirroring | ||
is simply a straight copy. If there are any files or directories on the | ||
destination they may be deleted or modified to match the source. | ||
""" | ||
|
||
from __future__ import print_function | ||
from __future__ import unicode_literals | ||
|
||
|
||
from .copy import copy_file | ||
from .errors import ResourceNotFound | ||
from .walk import Walker | ||
from .opener import manage_fs | ||
|
||
|
||
def _compare(info1, info2): | ||
""" | ||
Compare two (file) info objects and return True if the file should | ||
be copied, or False if they should not. | ||
""" | ||
# Check filesize has changed | ||
if info1.size != info2.size: | ||
return True | ||
# Check modified dates | ||
date1 = info1.modified | ||
date2 = info2.modified | ||
return date1 is None or date2 is None or date1 > date2 | ||
|
||
|
||
def mirror(src_fs, dst_fs, walker=None, copy_if_newer=True): | ||
""" | ||
Mirror files / directories from one filesystem to another. | ||
:param src_fs: A source filesystem. | ||
:type src_fs: FS URL or instance. | ||
:param dst_fs: A destination filesystem. | ||
:type dst_fs: FS URL or instance. | ||
:param walker: An optional waler instance. | ||
:type walker: :class:`~fs.walk.Walker` | ||
:param bool copy_if_newer: Only copy newer files. | ||
Mirroring a filesystem will create an exact copy of ``src_fs`` on | ||
``dst_fs``, by removing any files / directories on the destination | ||
that aren't on the source, and copying files that aren't. | ||
""" | ||
with manage_fs(src_fs, writeable=False) as _src_fs: | ||
with manage_fs(dst_fs, create=True) as _dst_fs: | ||
with _src_fs.lock(), _dst_fs.lock(): | ||
return _mirror( | ||
_src_fs, | ||
_dst_fs, | ||
walker=walker, | ||
copy_if_newer=copy_if_newer | ||
) | ||
|
||
|
||
def _mirror(src_fs, dst_fs, walker=None, copy_if_newer=True): | ||
walker = walker or Walker() | ||
walk = walker.walk(src_fs, namespaces=['details']) | ||
for path, dirs, files in walk: | ||
try: | ||
dst = { | ||
info.name: info | ||
for info in dst_fs.scandir(path, namespaces=['details']) | ||
} | ||
except ResourceNotFound: | ||
dst_fs.makedir(path) | ||
dst = {} | ||
|
||
# Copy files | ||
for _file in files: | ||
_path = _file.make_path(path) | ||
dst_file = dst.pop(_file.name, None) | ||
if dst_file is not None: | ||
if dst_file.is_dir: | ||
# Destination is a directory, remove it | ||
dst_fs.removetree(_path) | ||
else: | ||
# Compare file info | ||
if copy_if_newer and not _compare(_file, dst_file): | ||
continue | ||
copy_file(src_fs, _path, dst_fs, _path) | ||
|
||
# Make directories | ||
for _dir in dirs: | ||
_path = _dir.make_path(path) | ||
dst_dir = dst.pop(_dir.name, None) | ||
if dst_dir is not None: | ||
# Directory name exists on dst | ||
if not dst_dir.is_dir: | ||
# Not a directory, so remove it | ||
dst_fs.remove(_path) | ||
else: | ||
# Make the directory in dst | ||
dst_fs.makedir(_path, recreate=True) | ||
|
||
# Remove any remaining resources | ||
while dst: | ||
_, info = dst.popitem() | ||
_path = info.make_path(path) | ||
if info.is_dir: | ||
dst_fs.removetree(_path) | ||
else: | ||
dst_fs.remove(_path) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
from __future__ import unicode_literals | ||
|
||
import unittest | ||
|
||
from fs.mirror import mirror | ||
from fs import open_fs | ||
|
||
|
||
class TestMirror(unittest.TestCase): | ||
|
||
|
||
def _contents(self, fs): | ||
"""Extract an FS in to a simple data structure.""" | ||
contents = [] | ||
for path, dirs, files in fs.walk(): | ||
for info in dirs: | ||
_path = info.make_path(path) | ||
contents.append((_path, 'dir', b'')) | ||
for info in files: | ||
_path = info.make_path(path) | ||
contents.append((_path, 'file', fs.getbytes(_path))) | ||
return sorted(contents) | ||
|
||
def assert_compare_fs(self, fs1, fs2): | ||
"""Assert filesystems and contents are the same.""" | ||
self.assertEqual( | ||
self._contents(fs1), | ||
self._contents(fs2) | ||
) | ||
|
||
def test_empty_mirror(self): | ||
m1 = open_fs('mem://') | ||
m2 = open_fs('mem://') | ||
mirror(m1, m2) | ||
self.assert_compare_fs(m1, m2) | ||
|
||
def test_mirror_one_file(self): | ||
m1 = open_fs('mem://') | ||
m1.settext('foo', 'hello') | ||
m2 = open_fs('mem://') | ||
mirror(m1, m2) | ||
self.assert_compare_fs(m1, m2) | ||
|
||
def test_mirror_one_file_one_dir(self): | ||
m1 = open_fs('mem://') | ||
m1.settext('foo', 'hello') | ||
m1.makedir('bar') | ||
m2 = open_fs('mem://') | ||
mirror(m1, m2) | ||
self.assert_compare_fs(m1, m2) | ||
|
||
def test_mirror_delete_replace(self): | ||
m1 = open_fs('mem://') | ||
m1.settext('foo', 'hello') | ||
m1.makedir('bar') | ||
m2 = open_fs('mem://') | ||
mirror(m1, m2) | ||
self.assert_compare_fs(m1, m2) | ||
m2.remove('foo') | ||
mirror(m1, m2) | ||
self.assert_compare_fs(m1, m2) | ||
m2.removedir('bar') | ||
mirror(m1, m2) | ||
self.assert_compare_fs(m1, m2) | ||
|
||
def test_mirror_extra_dir(self): | ||
m1 = open_fs('mem://') | ||
m1.settext('foo', 'hello') | ||
m1.makedir('bar') | ||
m2 = open_fs('mem://') | ||
m2.makedir('baz') | ||
mirror(m1, m2) | ||
self.assert_compare_fs(m1, m2) | ||
|
||
def test_mirror_extra_file(self): | ||
m1 = open_fs('mem://') | ||
m1.settext('foo', 'hello') | ||
m1.makedir('bar') | ||
m2 = open_fs('mem://') | ||
m2.makedir('baz') | ||
m2.touch('egg') | ||
mirror(m1, m2) | ||
self.assert_compare_fs(m1, m2) | ||
|
||
def test_mirror_wrong_type(self): | ||
m1 = open_fs('mem://') | ||
m1.settext('foo', 'hello') | ||
m1.makedir('bar') | ||
m2 = open_fs('mem://') | ||
m2.makedir('foo') | ||
m2.touch('bar') | ||
mirror(m1, m2) | ||
self.assert_compare_fs(m1, m2) | ||
|
||
def test_mirror_update(self): | ||
m1 = open_fs('mem://') | ||
m1.settext('foo', 'hello') | ||
m1.makedir('bar') | ||
m2 = open_fs('mem://') | ||
mirror(m1, m2) | ||
self.assert_compare_fs(m1, m2) | ||
m2.appendtext('foo', ' world!') | ||
mirror(m1, m2) | ||
self.assert_compare_fs(m1, m2) |