Skip to content

Commit

Permalink
Merge e4f22e3 into 84719be
Browse files Browse the repository at this point in the history
  • Loading branch information
chfw committed Aug 9, 2019
2 parents 84719be + e4f22e3 commit 519c892
Show file tree
Hide file tree
Showing 12 changed files with 228 additions and 35 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Expand Up @@ -7,11 +7,18 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

## [2.4.11] - Unreleased

### Added

- Added geturl for TarFS and ZipFS for 'fs' purpose. NoURL for 'download' purpose.
- Added helpful root path in CreateFailed exception [#340](https://github.com/PyFilesystem/pyfilesystem2/issues/340)

### Fixed

- Fixed tests leaving tmp files
- Fixed typing issues
- Fixed link namespace returning bytes
- Fixed broken FSURL in windows [#329](https://github.com/PyFilesystem/pyfilesystem2/issues/329)
- Fixed hidden exception at fs.close() when opening an absent zip/tar file URL [#33](https://github.com/PyFilesystem/pyfilesystem2/issues/333)

## [2.4.10] - 2019-07-29

Expand Down
1 change: 1 addition & 0 deletions CONTRIBUTORS.md
Expand Up @@ -2,6 +2,7 @@

Many thanks to the following developers for contributing to this project:

- [C. W.](https://github.com/chfw)
- [Diego Argueta](https://github.com/dargueta)
- [Geoff Jukes](https://github.com/geoffjukes)
- [Giampaolo](https://github.com/gpcimino)
Expand Down
47 changes: 47 additions & 0 deletions fs/_url_tools.py
@@ -0,0 +1,47 @@
import re
import six
import platform


_WINDOWS_PLATFORM = platform.system() == "Windows"


def url_quote(path_snippet):
"""
On Windows, it will separate drive letter and quote windows
path alone. No magic on Unix-alie path, just pythonic
`pathname2url`
Arguments:
path_snippet: a file path, relative or absolute.
"""
# type: (Text) -> Text
if _WINDOWS_PLATFORM and _has_drive_letter(path_snippet):
drive_letter, path = path_snippet.split(":", 1)
if six.PY2:
path = path.encode("utf-8")
path = six.moves.urllib.request.pathname2url(path)
path_snippet = "{}:{}".format(drive_letter, path)
else:
if six.PY2:
path_snippet = path_snippet.encode("utf-8")
path_snippet = six.moves.urllib.request.pathname2url(path_snippet)
return path_snippet


def _has_drive_letter(path_snippet):
"""
The following path will get True
D:/Data
C:\\My Dcouments\\ test
And will get False
/tmp/abc:test
Arguments:
path_snippet: a file path, relative or absolute.
"""
# type: (Text) -> Text
windows_drive_pattern = ".:[/\\\\].*$"
return re.match(windows_drive_pattern, path_snippet) is not None
2 changes: 1 addition & 1 deletion fs/base.py
Expand Up @@ -1633,7 +1633,7 @@ def hash(self, path, name):
fs.errors.UnsupportedHash: If the requested hash is not supported.
"""
_path = self.validatepath(path)
self.validatepath(path)
try:
hash_object = hashlib.new(name)
except ValueError:
Expand Down
15 changes: 10 additions & 5 deletions fs/osfs.py
Expand Up @@ -39,7 +39,6 @@
sendfile = None # type: ignore # pragma: no cover

from . import errors
from .errors import FileExists
from .base import FS
from .enums import ResourceType
from ._fscompat import fsencode, fsdecode, fspath
Expand All @@ -49,6 +48,7 @@
from .error_tools import convert_os_errors
from .mode import Mode, validate_open_mode
from .errors import FileExpected, NoURL
from ._url_tools import url_quote

if False: # typing.TYPE_CHECKING
from typing import (
Expand Down Expand Up @@ -137,7 +137,8 @@ def __init__(
)
else:
if not os.path.isdir(_root_path):
raise errors.CreateFailed("root path does not exist")
message = "root path '{}' does not exist".format(_root_path)
raise errors.CreateFailed(message)

_meta = self._meta = {
"network": False,
Expand Down Expand Up @@ -526,7 +527,6 @@ def _scandir(self, path, namespaces=None):
namespaces = namespaces or ()
_path = self.validatepath(path)
sys_path = self.getsyspath(_path)
_sys_path = fsencode(sys_path)
with convert_os_errors("scandir", path, directory=True):
for entry_name in os.listdir(sys_path):
_entry_name = fsdecode(entry_name)
Expand Down Expand Up @@ -584,9 +584,14 @@ def getsyspath(self, path):

def geturl(self, path, purpose="download"):
# type: (Text, Text) -> Text
if purpose != "download":
sys_path = self.getsyspath(path)
if purpose == "download":
return "file://" + self.getsyspath(path)
elif purpose == "fs":
url_path = url_quote(sys_path)
return "osfs://" + url_path
else:
raise NoURL(path, purpose)
return "file://" + self.getsyspath(path)

def gettype(self, path):
# type: (Text) -> ResourceType
Expand Down
20 changes: 14 additions & 6 deletions fs/tarfs.py
Expand Up @@ -4,7 +4,6 @@
from __future__ import print_function
from __future__ import unicode_literals

import copy
import os
import tarfile
import typing
Expand All @@ -17,14 +16,14 @@
from .base import FS
from .compress import write_tar
from .enums import ResourceType
from .errors import IllegalBackReference
from .errors import IllegalBackReference, NoURL
from .info import Info
from .iotools import RawWrapper
from .opener import open_fs
from .path import dirname, relpath, basename, isbase, normpath, parts, frombase
from .path import relpath, basename, isbase, normpath, parts, frombase
from .wrapfs import WrapFS
from .permissions import Permissions

from ._url_tools import url_quote

if False: # typing.TYPE_CHECKING
from tarfile import TarInfo
Expand Down Expand Up @@ -461,16 +460,25 @@ def removedir(self, path):
def close(self):
# type: () -> None
super(ReadTarFS, self).close()
self._tar.close()
if hasattr(self, "_tar"):
self._tar.close()

def isclosed(self):
# type: () -> bool
return self._tar.closed # type: ignore

def geturl(self, path, purpose="download"):
# type: (Text, Text) -> Text
if purpose == "fs":
quoted_file = url_quote(self._file)
quoted_path = url_quote(path)
return "tar://{}!/{}".format(quoted_file, quoted_path)
else:
raise NoURL(path, purpose)


if __name__ == "__main__": # pragma: no cover
from fs.tree import render
from fs.opener import open_fs

with TarFS("tests.tar") as tar_fs:
print(tar_fs.listdir("/"))
Expand Down
1 change: 0 additions & 1 deletion fs/test.py
Expand Up @@ -1846,4 +1846,3 @@ def test_hash(self):
self.assertEqual(
foo_fs.hash("hashme.txt", "md5"), "9fff4bb103ab8ce4619064109c54cb9c"
)

13 changes: 12 additions & 1 deletion fs/zipfs.py
Expand Up @@ -22,6 +22,7 @@
from .path import dirname, forcedir, normpath, relpath
from .time import datetime_to_epoch
from .wrapfs import WrapFS
from ._url_tools import url_quote

if False: # typing.TYPE_CHECKING
from typing import (
Expand Down Expand Up @@ -434,7 +435,8 @@ def removedir(self, path):
def close(self):
# type: () -> None
super(ReadZipFS, self).close()
self._zip.close()
if hasattr(self, "_zip"):
self._zip.close()

def readbytes(self, path):
# type: (Text) -> bytes
Expand All @@ -444,3 +446,12 @@ def readbytes(self, path):
zip_name = self._path_to_zip_name(path)
zip_bytes = self._zip.read(zip_name)
return zip_bytes

def geturl(self, path, purpose="download"):
# type: (Text, Text) -> Text
if purpose == "fs":
quoted_file = url_quote(self._file)
quoted_path = url_quote(path)
return "zip://{}!/{}".format(quoted_file, quoted_path)
else:
raise errors.NoURL(path, purpose)
44 changes: 40 additions & 4 deletions tests/test_osfs.py
Expand Up @@ -9,9 +9,8 @@
import tempfile
import unittest

from fs import osfs
from fs import fsencode, fsdecode
from fs.path import relpath
from fs import osfs, open_fs
from fs.path import relpath, dirname
from fs import errors

from fs.test import FSTestCases
Expand Down Expand Up @@ -72,7 +71,7 @@ def assert_text(self, path, contents):

def test_not_exists(self):
with self.assertRaises(errors.CreateFailed):
fs = osfs.OSFS("/does/not/exists/")
osfs.OSFS("/does/not/exists/")

def test_expand_vars(self):
self.fs.makedir("TYRIONLANISTER")
Expand Down Expand Up @@ -157,3 +156,40 @@ def test_validatepath(self):
with self.assertRaises(errors.InvalidCharsInPath):
with self.fs.open("13 – Marked Register.pdf", "wb") as fh:
fh.write(b"foo")

def test_consume_geturl(self):
self.fs.create("foo")
try:
url = self.fs.geturl("foo", purpose="fs")
except errors.NoURL:
self.assertFalse(self.fs.hasurl("foo"))
else:
self.assertTrue(self.fs.hasurl("foo"))

# Should not throw an error
base_dir = dirname(url)
open_fs(base_dir)

def test_complex_geturl(self):
self.fs.makedirs("foo/bar ha")
test_fixtures = [
# test file, expected url path
["foo", "foo"],
["foo-bar", "foo-bar"],
["foo_bar", "foo_bar"],
["foo/bar ha/barz", "foo/bar%20ha/barz"],
["example b.txt", "example%20b.txt"],
["exampleㄓ.txt", "example%E3%84%93.txt"],
]
file_uri_prefix = "osfs://"
for test_file, relative_url_path in test_fixtures:
self.fs.create(test_file)
expected = file_uri_prefix + self.fs.getsyspath(relative_url_path).replace(
"\\", "/"
)
actual = self.fs.geturl(test_file, purpose="fs")

self.assertEqual(actual, expected)

def test_geturl_return_no_url(self):
self.assertRaises(errors.NoURL, self.fs.geturl, "test/path", "upload")
44 changes: 30 additions & 14 deletions tests/test_tarfs.py
Expand Up @@ -4,20 +4,16 @@
import io
import os
import six
import gzip
import tarfile
import getpass
import tarfile
import tempfile
import unittest
import uuid

from fs import tarfs
from fs import errors
from fs.enums import ResourceType
from fs.compress import write_tar
from fs.opener import open_fs
from fs.opener.errors import NotWriteable
from fs.errors import NoURL
from fs.test import FSTestCases

from .test_archives import ArchiveTestCases
Expand Down Expand Up @@ -96,15 +92,6 @@ def destroy_fs(self, fs):
os.remove(fs._tar_file)
del fs._tar_file

def assert_is_bzip(self):
try:
tarfile.open(fs._tar_file, "r:gz")
except tarfile.ReadError:
self.fail("{} is not a valid gz archive".format(fs._tar_file))
for other_comps in ["xz", "bz2", ""]:
with self.assertRaises(tarfile.ReadError):
tarfile.open(fs._tar_file, "r:{}".format(other_comps))


@unittest.skipIf(six.PY2, "Python2 does not support LZMA")
class TestWriteXZippedTarFS(FSTestCases, unittest.TestCase):
Expand Down Expand Up @@ -184,11 +171,40 @@ def test_read_from_filename(self):
except:
self.fail("Couldn't open tarfs from filename")

def test_read_non_existent_file(self):
fs = tarfs.TarFS(open(self._temp_path, "rb"))
# it has been very difficult to catch exception in __del__()
del fs._tar
try:
fs.close()
except AttributeError:
self.fail("Could not close tar fs properly")
except Exception:
self.fail("Strange exception in closing fs")

def test_getinfo(self):
super(TestReadTarFS, self).test_getinfo()
top = self.fs.getinfo("top.txt", ["tar"])
self.assertTrue(top.get("tar", "is_file"))

def test_geturl_for_fs(self):
test_fixtures = [
# test_file, expected
["foo/bar/egg/foofoo", "foo/bar/egg/foofoo"],
["foo/bar egg/foo foo", "foo/bar%20egg/foo%20foo"],
]
tar_file_path = self._temp_path.replace("\\", "/")
for test_file, expected_file in test_fixtures:
expected = "tar://{tar_file_path}!/{file_inside_tar}".format(
tar_file_path=tar_file_path, file_inside_tar=expected_file
)
self.assertEqual(self.fs.geturl(test_file, purpose="fs"), expected)

def test_geturl_for_download(self):
test_file = "foo/bar/egg/foofoo"
with self.assertRaises(NoURL):
self.fs.geturl(test_file)


class TestBrokenPaths(unittest.TestCase):
@classmethod
Expand Down

0 comments on commit 519c892

Please sign in to comment.