Skip to content

Commit

Permalink
Improve TarFileSystem
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Mar 7, 2021
1 parent 6044473 commit dede47a
Show file tree
Hide file tree
Showing 7 changed files with 257 additions and 45 deletions.
3 changes: 2 additions & 1 deletion fsspec/__init__.py
Expand Up @@ -17,14 +17,15 @@
register_implementation,
registry,
)
from .spec import AbstractFileSystem
from .spec import AbstractArchiveFileSystem, AbstractFileSystem

__version__ = get_versions()["version"]
del get_versions


__all__ = [
"AbstractFileSystem",
"AbstractArchiveFileSystem",
"FSMap",
"filesystem",
"register_implementation",
Expand Down
22 changes: 3 additions & 19 deletions fsspec/implementations/libarchive.py
Expand Up @@ -4,12 +4,12 @@

import libarchive

from fsspec import AbstractFileSystem, open_files
from fsspec import AbstractArchiveFileSystem, AbstractFileSystem, open_files
from fsspec.implementations.memory import MemoryFile
from fsspec.utils import DEFAULT_BLOCK_SIZE, tokenize
from fsspec.utils import DEFAULT_BLOCK_SIZE


class LibArchiveFileSystem(AbstractFileSystem):
class LibArchiveFileSystem(AbstractFileSystem, AbstractArchiveFileSystem):
"""Compressed archives as a file-system (read-only)
Supports the following formats:
Expand Down Expand Up @@ -181,19 +181,3 @@ def _open(
else:
raise ValueError
return MemoryFile(fs=self, path=path, data=data)

def ukey(self, path):
return tokenize(path, self.fo, self.protocol)

def _all_dirnames(self, paths):
"""Returns *all* directory names for each path in paths, including intermediate ones.
Parameters
----------
paths: Iterable of path strings
"""
if len(paths) == 0:
return set()

dirnames = {self._parent(path) for path in paths} - {self.root_marker}
return dirnames | self._all_dirnames(dirnames)
38 changes: 32 additions & 6 deletions fsspec/implementations/tar.py
@@ -1,23 +1,38 @@
import copy
import logging
import tarfile

import fsspec
from fsspec.compression import compr
from fsspec.spec import AbstractArchiveFileSystem
from fsspec.utils import infer_compression

typemap = {b"0": "file", b"5": "directory"}

logger = logging.getLogger("tar")


class TarFileSystem(fsspec.AbstractFileSystem):
class TarFileSystem(fsspec.AbstractFileSystem, AbstractArchiveFileSystem):
def __init__(
self, fo, index_store=None, storage_options=None, compression=None, **kwargs
):
super().__init__(**kwargs)
if isinstance(fo, str):
fo = fsspec.open(fo, **(storage_options or {})).open()

# Try to infer compression.
if compression is None:
try:
name = fo.info()["name"]
compression = infer_compression(name)
except Exception as ex:
logger.warning(f"Unable to infer compression: {ex}")

if compression:
# TODO: tarfile already implements compression with modes like "'r:gz'",
# but then would seek to offset in the file work?
# TODO: "infer" is not supported here
fo = compr[compression](fo)

self.fo = fo
self.index_store = index_store
self.index = None
Expand All @@ -35,9 +50,11 @@ def _index(self):
self.index = out
# TODO: save index to self.index_store here, if set

def ls(self, path, detail=True, **kwargs):
def ls(self, path, detail=False, **kwargs):
path = self._strip_protocol(path)
parts = path.rstrip("/").split("/")
if parts and parts[0] == "":
parts = []
out = []
for name, (details, _) in self.index.items():
nparts = name.rstrip("/").split("/")
Expand All @@ -52,14 +69,17 @@ def ls(self, path, detail=True, **kwargs):
return [o["name"] for o in out]

def info(self, path, **kwargs):
return self.index[path][0]
try:
return self.index[path][0]
except KeyError:
raise FileNotFoundError(path)

def _open(self, path, mode="rb", **kwargs):
if mode != "rb":
raise ValueError("Read Only filesystem implementation")
raise ValueError("Read-only filesystem implementation")
details, offset = self.index[path]
if details["type"] != "file":
raise ValueError("Can only regilar files")
raise ValueError("Can only handle regular files")
newfo = copy.copy(self.fo)
newfo.seek(offset)
return TarContainedFile(newfo, self.info(path))
Expand Down Expand Up @@ -104,3 +124,9 @@ def seek(self, to, whence=0):
def close(self):
self.of.close()
self.closed = True

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
198 changes: 198 additions & 0 deletions fsspec/implementations/tests/test_tar.py
@@ -0,0 +1,198 @@
import os
import pickle
import sys
import tarfile
import tempfile
from contextlib import contextmanager
from io import BytesIO

import pytest

import fsspec


@contextmanager
def temptar(data={}):
f = tempfile.mkstemp(suffix="tar")[1]
with tarfile.TarFile(f, mode="w") as t:
for name, data in data.items():
# t.add("empty", arcname=name)

# Create directory hierarchy.
# https://bugs.python.org/issue22208#msg225558
if "/" in name:
current = []
for part in os.path.dirname(name).split("/"):
current.append(part)
info = tarfile.TarInfo("/".join(current))
info.type = tarfile.DIRTYPE
t.addfile(info)

info = tarfile.TarInfo(name=name)
info.size = len(data)
t.addfile(info, BytesIO(data))
try:
yield f
finally:
try:
os.remove(f)
except (IOError, OSError):
pass


data = {"a": b"", "b": b"hello", "deeply/nested/path": b"stuff"}


def test_empty():
with temptar() as t:
fs = fsspec.filesystem("tar", fo=t)
assert fs.find("") == []
assert fs.find("", withdirs=True) == []
with pytest.raises(FileNotFoundError):
fs.info("")
assert fs.ls("") == []


def test_glob():
with temptar(data) as t:
fs = fsspec.filesystem("tar", fo=t)
print("glob:", fs.glob("*"))
assert fs.glob("*/*/*th") == ["deeply/nested/path"]


@pytest.mark.xfail(sys.version_info < (3, 6), reason="zip-info odd on py35")
def test_mapping():
with temptar(data) as t:
fs = fsspec.filesystem("tar", fo=t)
m = fs.get_mapper("")
assert list(m) == ["a", "b", "deeply/nested/path"]
assert m["b"] == data["b"]


@pytest.mark.xfail(sys.version_info < (3, 6), reason="zip not supported on py35")
def test_pickle():
with temptar(data) as t:
fs = fsspec.filesystem("tar", fo=t)
fs2 = pickle.loads(pickle.dumps(fs))
assert fs2.cat("b") == b"hello"


def test_all_dirnames():
with temptar() as t:
fs = fsspec.filesystem("tar", fo=t)

# fx are files, dx are a directories
assert fs._all_dirnames([]) == set()
assert fs._all_dirnames(["f1"]) == set()
assert fs._all_dirnames(["f1", "f2"]) == set()
assert fs._all_dirnames(["f1", "f2", "d1/f1"]) == {"d1"}
assert fs._all_dirnames(["f1", "d1/f1", "d1/f2"]) == {"d1"}
assert fs._all_dirnames(["f1", "d1/f1", "d2/f1"]) == {"d1", "d2"}
assert fs._all_dirnames(["d1/d1/d1/f1"]) == {"d1", "d1/d1", "d1/d1/d1"}


def test_ls():
with temptar(data) as t:
lhs = fsspec.filesystem("tar", fo=t)

assert lhs.ls("") == ["a", "b", "deeply/"]
assert lhs.ls("/") == lhs.ls("")

assert lhs.ls("deeply") == ["deeply/nested/"]
assert lhs.ls("deeply/") == lhs.ls("deeply")

assert lhs.ls("deeply/nested") == ["deeply/nested/path"]
assert lhs.ls("deeply/nested/") == lhs.ls("deeply/nested")


def test_find():
with temptar(data) as t:
lhs = fsspec.filesystem("tar", fo=t)

assert lhs.find("") == ["a", "b", "deeply/nested/path"]
assert lhs.find("", withdirs=True) == [
"a",
"b",
"deeply/",
"deeply/nested/",
"deeply/nested/path",
]

assert lhs.find("deeply") == ["deeply/nested/path"]
assert lhs.find("deeply/") == lhs.find("deeply")


def test_walk():
with temptar(data) as t:
fs = fsspec.filesystem("tar", fo=t)
expected = [
# (dirname, list of subdirs, list of files)
("", ["deeply"], ["a", "b"]),
("deeply", ["nested"], []),
("deeply/nested", [], ["path"]),
]
assert list(fs.walk("")) == expected


def test_info():
with temptar(data) as t:
fs_cache = fsspec.filesystem("tar", fo=t)

with pytest.raises(FileNotFoundError):
fs_cache.info("i-do-not-exist")

# Iterate over all directories
# The ZipFile does not include additional information about the directories,
for d in fs_cache._all_dirnames(data.keys()):
lhs = fs_cache.info(f"{d}/")
del lhs["chksum"]
expected = {
"name": f"{d}/",
"size": 0,
"type": "directory",
"devmajor": 0,
"devminor": 0,
"gid": 0,
"gname": "",
"linkname": "",
"mode": 420,
"mtime": 0,
"uid": 0,
"uname": "",
}
assert lhs == expected

# Iterate over all files
for f, v in data.items():
lhs = fs_cache.info(f)
assert lhs["name"] == f
assert lhs["size"] == len(v)
assert lhs["type"] == "file"

# There are many flags specific to Zip Files.
# These are two we can use to check we are getting some of them
assert "chksum" in lhs


"""
@pytest.mark.parametrize("scale", [128, 512, 4096])
def test_isdir_isfile(scale):
def make_nested_dir(i):
x = f"{i}"
table = x.maketrans("0123456789", "ABCDEFGHIJ")
return "/".join(x.translate(table))
scaled_data = {f"{make_nested_dir(i)}/{i}": b"" for i in range(1, scale + 1)}
with temptar(scaled_data) as t:
fs = fsspec.filesystem("tar", fo=t)
lhs_dirs, lhs_files = fs._all_dirnames(scaled_data.keys()), scaled_data.keys()
# Warm-up the Cache, this is done in both cases anyways...
fs._get_dirs()
entries = lhs_files | lhs_dirs
assert lhs_dirs == {e for e in entries if fs.isdir(e)}
assert lhs_files == {e for e in entries if fs.isfile(e)}
"""
22 changes: 3 additions & 19 deletions fsspec/implementations/zip.py
Expand Up @@ -2,11 +2,11 @@

import zipfile

from fsspec import AbstractFileSystem, open_files
from fsspec.utils import DEFAULT_BLOCK_SIZE, tokenize
from fsspec import AbstractArchiveFileSystem, AbstractFileSystem, open_files
from fsspec.utils import DEFAULT_BLOCK_SIZE


class ZipFileSystem(AbstractFileSystem):
class ZipFileSystem(AbstractFileSystem, AbstractArchiveFileSystem):
"""Read contents of ZIP archive as a file-system
Keeps file object open while instance lives.
Expand Down Expand Up @@ -135,19 +135,3 @@ def _open(
out.size = info["size"]
out.name = info["name"]
return out

def ukey(self, path):
return tokenize(path, self.fo, self.protocol)

def _all_dirnames(self, paths):
"""Returns *all* directory names for each path in paths, including intermediate ones.
Parameters
----------
paths: Iterable of path strings
"""
if len(paths) == 0:
return set()

dirnames = {self._parent(path) for path in paths} - {self.root_marker}
return dirnames | self._all_dirnames(dirnames)
1 change: 1 addition & 0 deletions fsspec/registry.py
Expand Up @@ -104,6 +104,7 @@ def register_implementation(name, cls, clobber=True, errtxt=None):
"err": 'HTTPFileSystem requires "requests" and "aiohttp" to be installed',
},
"zip": {"class": "fsspec.implementations.zip.ZipFileSystem"},
"tar": {"class": "fsspec.implementations.tar.TarFileSystem"},
"gcs": {
"class": "gcsfs.GCSFileSystem",
"err": "Please install gcsfs to access Google Storage",
Expand Down

0 comments on commit dede47a

Please sign in to comment.