-
-
Notifications
You must be signed in to change notification settings - Fork 173
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #470 from PyFilesystem/readonly-removetree
Fix miscellaneous bugs in `fs.wrap`
- Loading branch information
Showing
3 changed files
with
218 additions
and
69 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,97 +1,219 @@ | ||
from __future__ import unicode_literals | ||
|
||
import operator | ||
import unittest | ||
|
||
from fs import errors | ||
try: | ||
from unittest import mock | ||
except ImportError: | ||
import mock | ||
|
||
import six | ||
|
||
import fs.copy | ||
import fs.mirror | ||
import fs.move | ||
import fs.wrap | ||
import fs.errors | ||
from fs import open_fs | ||
from fs import wrap | ||
from fs.info import Info | ||
|
||
|
||
class TestWrap(unittest.TestCase): | ||
def test_readonly(self): | ||
mem_fs = open_fs("mem://") | ||
fs = wrap.read_only(mem_fs) | ||
class TestWrapReadOnly(unittest.TestCase): | ||
def setUp(self): | ||
self.fs = open_fs("mem://") | ||
self.ro = fs.wrap.read_only(self.fs) | ||
|
||
with self.assertRaises(errors.ResourceReadOnly): | ||
fs.open("foo", "w") | ||
def tearDown(self): | ||
self.fs.close() | ||
|
||
with self.assertRaises(errors.ResourceReadOnly): | ||
fs.appendtext("foo", "bar") | ||
def assertReadOnly(self, func, *args, **kwargs): | ||
self.assertRaises(fs.errors.ResourceReadOnly, func, *args, **kwargs) | ||
|
||
with self.assertRaises(errors.ResourceReadOnly): | ||
fs.appendbytes("foo", b"bar") | ||
def test_open_w(self): | ||
self.assertReadOnly(self.ro.open, "foo", "w") | ||
|
||
with self.assertRaises(errors.ResourceReadOnly): | ||
fs.makedir("foo") | ||
def test_appendtext(self): | ||
self.assertReadOnly(self.ro.appendtext, "foo", "bar") | ||
|
||
with self.assertRaises(errors.ResourceReadOnly): | ||
fs.move("foo", "bar") | ||
def test_appendbytes(self): | ||
self.assertReadOnly(self.ro.appendbytes, "foo", b"bar") | ||
|
||
with self.assertRaises(errors.ResourceReadOnly): | ||
fs.openbin("foo", "w") | ||
def test_makedir(self): | ||
self.assertReadOnly(self.ro.makedir, "foo") | ||
|
||
with self.assertRaises(errors.ResourceReadOnly): | ||
fs.remove("foo") | ||
def test_move(self): | ||
self.assertReadOnly(self.ro.move, "foo", "bar") | ||
|
||
with self.assertRaises(errors.ResourceReadOnly): | ||
fs.removedir("foo") | ||
def test_openbin_w(self): | ||
self.assertReadOnly(self.ro.openbin, "foo", "w") | ||
|
||
with self.assertRaises(errors.ResourceReadOnly): | ||
fs.setinfo("foo", {}) | ||
def test_remove(self): | ||
self.assertReadOnly(self.ro.remove, "foo") | ||
|
||
with self.assertRaises(errors.ResourceReadOnly): | ||
fs.settimes("foo", {}) | ||
def test_removedir(self): | ||
self.assertReadOnly(self.ro.removedir, "foo") | ||
|
||
with self.assertRaises(errors.ResourceReadOnly): | ||
fs.copy("foo", "bar") | ||
def test_removetree(self): | ||
self.assertReadOnly(self.ro.removetree, "foo") | ||
|
||
with self.assertRaises(errors.ResourceReadOnly): | ||
fs.create("foo") | ||
def test_setinfo(self): | ||
self.assertReadOnly(self.ro.setinfo, "foo", {}) | ||
|
||
with self.assertRaises(errors.ResourceReadOnly): | ||
fs.writetext("foo", "bar") | ||
def test_settimes(self): | ||
self.assertReadOnly(self.ro.settimes, "foo", {}) | ||
|
||
with self.assertRaises(errors.ResourceReadOnly): | ||
fs.writebytes("foo", b"bar") | ||
def test_copy(self): | ||
self.assertReadOnly(self.ro.copy, "foo", "bar") | ||
|
||
with self.assertRaises(errors.ResourceReadOnly): | ||
fs.makedirs("foo/bar") | ||
def test_create(self): | ||
self.assertReadOnly(self.ro.create, "foo") | ||
|
||
with self.assertRaises(errors.ResourceReadOnly): | ||
fs.touch("foo") | ||
def test_writetext(self): | ||
self.assertReadOnly(self.ro.writetext, "foo", "bar") | ||
|
||
with self.assertRaises(errors.ResourceReadOnly): | ||
fs.upload("foo", None) | ||
def test_writebytes(self): | ||
self.assertReadOnly(self.ro.writebytes, "foo", b"bar") | ||
|
||
with self.assertRaises(errors.ResourceReadOnly): | ||
fs.writefile("foo", None) | ||
def test_makedirs(self): | ||
self.assertReadOnly(self.ro.makedirs, "foo/bar") | ||
|
||
self.assertTrue(mem_fs.isempty("/")) | ||
mem_fs.writebytes("file", b"read me") | ||
with fs.openbin("file") as read_file: | ||
self.assertEqual(read_file.read(), b"read me") | ||
def test_touch(self): | ||
self.assertReadOnly(self.ro.touch, "foo") | ||
|
||
with fs.open("file", "rb") as read_file: | ||
self.assertEqual(read_file.read(), b"read me") | ||
def test_upload(self): | ||
self.assertReadOnly(self.ro.upload, "foo", six.BytesIO()) | ||
|
||
def test_cachedir(self): | ||
mem_fs = open_fs("mem://") | ||
mem_fs.makedirs("foo/bar/baz") | ||
mem_fs.touch("egg") | ||
def test_writefile(self): | ||
self.assertReadOnly(self.ro.writefile, "foo", six.StringIO()) | ||
|
||
fs = wrap.cache_directory(mem_fs) | ||
self.assertEqual(sorted(fs.listdir("/")), ["egg", "foo"]) | ||
self.assertEqual(sorted(fs.listdir("/")), ["egg", "foo"]) | ||
self.assertTrue(fs.isdir("foo")) | ||
self.assertTrue(fs.isdir("foo")) | ||
self.assertTrue(fs.isfile("egg")) | ||
self.assertTrue(fs.isfile("egg")) | ||
def test_openbin_r(self): | ||
self.fs.writebytes("file", b"read me") | ||
with self.ro.openbin("file") as read_file: | ||
self.assertEqual(read_file.read(), b"read me") | ||
|
||
self.assertEqual(fs.getinfo("foo"), mem_fs.getinfo("foo")) | ||
self.assertEqual(fs.getinfo("foo"), mem_fs.getinfo("foo")) | ||
def test_open_r(self): | ||
self.fs.writebytes("file", b"read me") | ||
with self.ro.open("file", "rb") as read_file: | ||
self.assertEqual(read_file.read(), b"read me") | ||
|
||
self.assertEqual(fs.getinfo("/"), mem_fs.getinfo("/")) | ||
self.assertEqual(fs.getinfo("/"), mem_fs.getinfo("/")) | ||
|
||
with self.assertRaises(errors.ResourceNotFound): | ||
fs.getinfo("/foofoo") | ||
class TestWrapReadOnlySyspath(unittest.TestCase): | ||
# If the wrapped fs has a syspath, there is a chance that somewhere | ||
# in fs.copy or fs.mirror we try to use it to our advantage, but | ||
# we want to make sure these implementations don't circumvent the | ||
# wrapper. | ||
|
||
def setUp(self): | ||
self.fs = open_fs("temp://") | ||
self.ro = fs.wrap.read_only(self.fs) | ||
self.src = open_fs("temp://") | ||
self.src.touch("foo") | ||
self.src.makedir("bar") | ||
|
||
def tearDown(self): | ||
self.fs.close() | ||
self.src.close() | ||
|
||
def assertReadOnly(self, func, *args, **kwargs): | ||
self.assertRaises(fs.errors.ResourceReadOnly, func, *args, **kwargs) | ||
|
||
def test_copy_fs(self): | ||
self.assertReadOnly(fs.copy.copy_fs, self.src, self.ro) | ||
|
||
def test_copy_fs_if_newer(self): | ||
self.assertReadOnly(fs.copy.copy_fs_if_newer, self.src, self.ro) | ||
|
||
def test_copy_file(self): | ||
self.assertReadOnly(fs.copy.copy_file, self.src, "foo", self.ro, "foo") | ||
|
||
def test_copy_file_if_newer(self): | ||
self.assertReadOnly(fs.copy.copy_file_if_newer, self.src, "foo", self.ro, "foo") | ||
|
||
def test_copy_structure(self): | ||
self.assertReadOnly(fs.copy.copy_structure, self.src, self.ro) | ||
|
||
def test_mirror(self): | ||
self.assertReadOnly(fs.mirror.mirror, self.src, self.ro) | ||
fs.mirror.mirror(self.src, self.fs) | ||
self.fs.touch("baz") | ||
self.assertReadOnly(fs.mirror.mirror, self.src, self.ro) | ||
|
||
def test_move_fs(self): | ||
self.assertReadOnly(fs.move.move_fs, self.src, self.ro) | ||
self.src.removetree("/") | ||
self.fs.touch("foo") | ||
self.assertReadOnly(fs.move.move_fs, self.ro, self.src) | ||
|
||
def test_move_file(self): | ||
self.assertReadOnly(fs.move.move_file, self.src, "foo", self.ro, "foo") | ||
self.fs.touch("baz") | ||
self.assertReadOnly(fs.move.move_file, self.ro, "baz", self.src, "foo") | ||
|
||
def test_move_dir(self): | ||
self.assertReadOnly(fs.move.move_file, self.src, "bar", self.ro, "bar") | ||
self.fs.makedir("baz") | ||
self.assertReadOnly(fs.move.move_dir, self.ro, "baz", self.src, "baz") | ||
|
||
|
||
class TestWrapCachedDir(unittest.TestCase): | ||
def setUp(self): | ||
self.fs = open_fs("mem://") | ||
self.fs.makedirs("foo/bar/baz") | ||
self.fs.touch("egg") | ||
self.cached = fs.wrap.cache_directory(self.fs) | ||
|
||
def tearDown(self): | ||
self.fs.close() | ||
|
||
def assertNotFound(self, func, *args, **kwargs): | ||
self.assertRaises(fs.errors.ResourceNotFound, func, *args, **kwargs) | ||
|
||
def test_scandir(self): | ||
key = operator.attrgetter("name") | ||
expected = [ | ||
Info({"basic": {"name": "egg", "is_dir": False}}), | ||
Info({"basic": {"name": "foo", "is_dir": True}}), | ||
] | ||
with mock.patch.object(self.fs, "scandir", wraps=self.fs.scandir) as scandir: | ||
self.assertEqual(sorted(self.cached.scandir("/"), key=key), expected) | ||
scandir.assert_has_calls([mock.call('/', namespaces=None, page=None)]) | ||
with mock.patch.object(self.fs, "scandir", wraps=self.fs.scandir) as scandir: | ||
self.assertEqual(sorted(self.cached.scandir("/"), key=key), expected) | ||
scandir.assert_not_called() | ||
|
||
def test_isdir(self): | ||
with mock.patch.object(self.fs, "scandir", wraps=self.fs.scandir) as scandir: | ||
self.assertTrue(self.cached.isdir("foo")) | ||
self.assertFalse(self.cached.isdir("egg")) # is file | ||
self.assertFalse(self.cached.isdir("spam")) # doesn't exist | ||
scandir.assert_has_calls([mock.call('/', namespaces=None, page=None)]) | ||
with mock.patch.object(self.fs, "scandir", wraps=self.fs.scandir) as scandir: | ||
self.assertTrue(self.cached.isdir("foo")) | ||
self.assertFalse(self.cached.isdir("egg")) | ||
self.assertFalse(self.cached.isdir("spam")) | ||
scandir.assert_not_called() | ||
|
||
def test_isfile(self): | ||
with mock.patch.object(self.fs, "scandir", wraps=self.fs.scandir) as scandir: | ||
self.assertTrue(self.cached.isfile("egg")) | ||
self.assertFalse(self.cached.isfile("foo")) # is dir | ||
self.assertFalse(self.cached.isfile("spam")) # doesn't exist | ||
scandir.assert_has_calls([mock.call('/', namespaces=None, page=None)]) | ||
with mock.patch.object(self.fs, "scandir", wraps=self.fs.scandir) as scandir: | ||
self.assertTrue(self.cached.isfile("egg")) | ||
self.assertFalse(self.cached.isfile("foo")) | ||
self.assertFalse(self.cached.isfile("spam")) | ||
scandir.assert_not_called() | ||
|
||
def test_getinfo(self): | ||
with mock.patch.object(self.fs, "scandir", wraps=self.fs.scandir) as scandir: | ||
self.assertEqual(self.cached.getinfo("foo"), self.fs.getinfo("foo")) | ||
self.assertEqual(self.cached.getinfo("/"), self.fs.getinfo("/")) | ||
self.assertNotFound(self.cached.getinfo, "spam") | ||
scandir.assert_has_calls([mock.call('/', namespaces=None, page=None)]) | ||
with mock.patch.object(self.fs, "scandir", wraps=self.fs.scandir) as scandir: | ||
self.assertEqual(self.cached.getinfo("foo"), self.fs.getinfo("foo")) | ||
self.assertEqual(self.cached.getinfo("/"), self.fs.getinfo("/")) | ||
self.assertNotFound(self.cached.getinfo, "spam") | ||
scandir.assert_not_called() |