Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 61 additions & 19 deletions fsspec/asyn.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@

from .callbacks import _DEFAULT_CALLBACK
from .exceptions import FSTimeoutError
from .implementations.local import (
LocalFileSystem,
make_path_posix,
trailing_sep,
trailing_sep_maybe_asterisk,
)
from .spec import AbstractBufferedFile, AbstractFileSystem
from .utils import is_exception, other_paths

Expand Down Expand Up @@ -336,15 +342,23 @@ async def _copy(
elif on_error is None:
on_error = "raise"

source_is_str = isinstance(path1, str)
paths = await self._expand_path(path1, maxdepth=maxdepth, recursive=recursive)
if source_is_str and (not recursive or maxdepth is not None):
# Non-recursive glob does not copy directories
paths = [p for p in paths if not (trailing_sep(p) or await self._isdir(p))]
if not paths:
return

isdir = isinstance(path2, str) and (
path2.endswith("/") or await self._isdir(path2)
trailing_sep(path2) or await self._isdir(path2)
)
path2 = other_paths(
paths,
path2,
exists=isdir and isinstance(path1, str) and not path1.endswith("/"),
exists=isdir and source_is_str and not trailing_sep_maybe_asterisk(path1),
is_dir=isdir,
flatten=not source_is_str,
)
batch_size = batch_size or self.batch_size
coros = [self._cp_file(p1, p2, **kwargs) for p1, p2 in zip(paths, path2)]
Expand Down Expand Up @@ -466,6 +480,7 @@ async def _put(
recursive=False,
callback=_DEFAULT_CALLBACK,
batch_size=None,
maxdepth=None,
**kwargs,
):
"""Copy file(s) from local.
Expand All @@ -481,21 +496,27 @@ async def _put(
constructor, or for all instances by setting the "gather_batch_size" key
in ``fsspec.config.conf``, falling back to 1/8th of the system limit .
"""
from .implementations.local import LocalFileSystem, make_path_posix

rpath = self._strip_protocol(rpath)
if isinstance(lpath, str):
source_is_str = isinstance(lpath, str)
if source_is_str:
lpath = make_path_posix(lpath)
fs = LocalFileSystem()
lpaths = fs.expand_path(lpath, recursive=recursive)
lpaths = fs.expand_path(lpath, recursive=recursive, maxdepth=maxdepth)
if source_is_str and (not recursive or maxdepth is not None):
# Non-recursive glob does not copy directories
lpaths = [p for p in lpaths if not (trailing_sep(p) or fs.isdir(p))]
if not lpaths:
return

isdir = isinstance(rpath, str) and (
rpath.endswith("/") or await self._isdir(rpath)
trailing_sep(rpath) or await self._isdir(rpath)
)
rpath = self._strip_protocol(rpath)
rpaths = other_paths(
lpaths,
rpath,
exists=isdir and isinstance(lpath, str) and not lpath.endswith("/"),
exists=isdir and source_is_str and not trailing_sep_maybe_asterisk(lpath),
is_dir=isdir,
flatten=not source_is_str,
)

is_dir = {l: os.path.isdir(l) for l in lpaths}
Expand All @@ -519,7 +540,13 @@ async def _get_file(self, rpath, lpath, **kwargs):
raise NotImplementedError

async def _get(
self, rpath, lpath, recursive=False, callback=_DEFAULT_CALLBACK, **kwargs
self,
rpath,
lpath,
recursive=False,
callback=_DEFAULT_CALLBACK,
maxdepth=None,
**kwargs,
):
"""Copy file(s) to local.

Expand All @@ -535,21 +562,31 @@ async def _get(
constructor, or for all instances by setting the "gather_batch_size" key
in ``fsspec.config.conf``, falling back to 1/8th of the system limit .
"""
from fsspec.implementations.local import LocalFileSystem, make_path_posix

source_is_str = isinstance(rpath, str)
# First check for rpath trailing slash as _strip_protocol removes it.
rpath_trailing_slash = isinstance(rpath, str) and rpath.endswith("/")
source_not_trailing_sep = source_is_str and not trailing_sep_maybe_asterisk(
rpath
)
rpath = self._strip_protocol(rpath)
lpath = make_path_posix(lpath)
rpaths = await self._expand_path(rpath, recursive=recursive)
if source_is_str and (not recursive or maxdepth is not None):
# Non-recursive glob does not copy directories
rpaths = [
p for p in rpaths if not (trailing_sep(p) or await self._isdir(p))
]
if not rpaths:
return

lpath = make_path_posix(lpath)
isdir = isinstance(lpath, str) and (
lpath.endswith("/") or LocalFileSystem().isdir(lpath)
trailing_sep(lpath) or LocalFileSystem().isdir(lpath)
)
lpaths = other_paths(
rpaths,
lpath,
exists=isdir and not rpath_trailing_slash,
exists=isdir and source_not_trailing_sep,
is_dir=isdir,
flatten=not source_is_str,
)
[os.makedirs(os.path.dirname(lp), exist_ok=True) for lp in lpaths]
batch_size = kwargs.pop("batch_size", self.batch_size)
Expand Down Expand Up @@ -766,9 +803,16 @@ async def _expand_path(self, path, recursive=False, maxdepth=None):
bit = set(await self._glob(p))
out |= bit
if recursive:
# glob call above expanded one depth so if maxdepth is defined
# then decrement it in expand_path call below. If it is zero
# after decrementing then avoid expand_path call.
if maxdepth is not None and maxdepth <= 1:
continue
out |= set(
await self._expand_path(
list(bit), recursive=recursive, maxdepth=maxdepth
list(bit),
recursive=recursive,
maxdepth=maxdepth - 1 if maxdepth is not None else None,
)
)
continue
Expand All @@ -778,8 +822,6 @@ async def _expand_path(self, path, recursive=False, maxdepth=None):
if p not in out and (recursive is False or (await self._exists(p))):
# should only check once, for the root
out.add(p)
# reduce depth on each recursion level unless None or 0
maxdepth = maxdepth if not maxdepth else maxdepth - 1
if not out:
raise FileNotFoundError(path)
return list(sorted(out))
Expand Down
8 changes: 3 additions & 5 deletions fsspec/implementations/tests/local/local_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@


class LocalFixtures(AbstractFixtures):
@staticmethod
@pytest.fixture
def fs():
@pytest.fixture(scope="class")
def fs(self):
return LocalFileSystem(auto_mkdir=True)

@staticmethod
@pytest.fixture
def fs_path(tmpdir):
def fs_path(self, tmpdir):
return str(tmpdir)
11 changes: 4 additions & 7 deletions fsspec/implementations/tests/memory/memory_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@


class MemoryFixtures(AbstractFixtures):
@staticmethod
@pytest.fixture
def fs():
@pytest.fixture(scope="class")
def fs(self):
m = filesystem("memory")
m.store.clear()
m.pseudo_dirs.clear()
Expand All @@ -19,12 +18,10 @@ def fs():
m.pseudo_dirs.clear()
m.pseudo_dirs.append("")

@staticmethod
@pytest.fixture
def fs_join():
def fs_join(self):
return lambda *args: "/".join(args)

@staticmethod
@pytest.fixture
def fs_path():
def fs_path(self):
return ""
18 changes: 17 additions & 1 deletion fsspec/implementations/tests/memory/memory_test.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import pytest

import fsspec.tests.abstract as abstract
from fsspec.implementations.tests.memory.memory_fixtures import MemoryFixtures

Expand All @@ -7,7 +9,21 @@ class TestMemoryCopy(abstract.AbstractCopyTests, MemoryFixtures):


class TestMemoryGet(abstract.AbstractGetTests, MemoryFixtures):
pass
@pytest.mark.skip(reason="Bug: does not auto-create new directory")
def test_get_file_to_new_directory(self):
pass

@pytest.mark.skip(reason="Bug: does not auto-create new directory")
def test_get_file_to_file_in_new_directory(self):
pass

@pytest.mark.skip(reason="Bug: does not auto-create new directory")
def test_get_glob_to_new_directory(self):
pass

@pytest.mark.skip(reason="Bug: does not auto-create new directory")
def test_get_list_of_files_to_new_directory(self):
pass


class TestMemoryPut(abstract.AbstractPutTests, MemoryFixtures):
Expand Down
13 changes: 0 additions & 13 deletions fsspec/implementations/tests/test_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,6 @@ def test_strip(m):
assert m._strip_protocol("/b/c/") == "/b/c"


def test_put_single(m, tmpdir):
fn = os.path.join(str(tmpdir), "dir")
os.mkdir(fn)
open(os.path.join(fn, "abc"), "w").write("text")
m.put(fn, "/test") # no-op, no files
assert m.isdir("/test")
assert not m.exists("/test/abc")
assert not m.exists("/test/dir")
m.put(fn, "/test", recursive=True)
assert m.isdir("/test/dir")
assert m.cat("/test/dir/abc") == b"text"


def test_ls(m):
m.mkdir("/dir")
m.mkdir("/dir/dir1")
Expand Down
2 changes: 1 addition & 1 deletion fsspec/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ def register_implementation(name, cls, clobber=False, errtxt=None):
"box": {
"class": "boxfs.BoxFileSystem",
"err": "Please install boxfs to access BoxFileSystem",
}
},
}


Expand Down
8 changes: 4 additions & 4 deletions fsspec/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -976,23 +976,23 @@ def put(
trailing_sep_maybe_asterisk,
)

if isinstance(lpath, str):
source_is_str = isinstance(lpath, str)
if source_is_str:
lpath = make_path_posix(lpath)
fs = LocalFileSystem()
source_is_str = isinstance(lpath, str)
lpaths = fs.expand_path(lpath, recursive=recursive, maxdepth=maxdepth)
if source_is_str and (not recursive or maxdepth is not None):
# Non-recursive glob does not copy directories
lpaths = [p for p in lpaths if not (trailing_sep(p) or self.isdir(p))]
lpaths = [p for p in lpaths if not (trailing_sep(p) or fs.isdir(p))]
if not lpaths:
return

isdir = isinstance(rpath, str) and (trailing_sep(rpath) or self.isdir(rpath))
rpath = (
self._strip_protocol(rpath)
if isinstance(rpath, str)
else [self._strip_protocol(p) for p in rpath]
)
isdir = isinstance(rpath, str) and (trailing_sep(rpath) or self.isdir(rpath))
rpaths = other_paths(
lpaths,
rpath,
Expand Down
Loading