Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
**/__pycache__
**/dist
**/build
**/build
**.pyc
2 changes: 1 addition & 1 deletion noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def install(session):

@nox.session(python=False)
def smoke(session):
session.install(*"pytest".split())
session.install(*"pytest aiohttp requests".split())
session.run(*"pytest --skiphdfs upath".split())


Expand Down
297 changes: 15 additions & 282 deletions upath/core.py
Original file line number Diff line number Diff line change
@@ -1,94 +1,9 @@
import os
import pathlib
import urllib
import re

from fsspec.registry import get_filesystem_class

from upath.errors import NotDirectoryError


class _FSSpecAccessor:
def __init__(self, parsed_url, *args, **kwargs):
self._url = parsed_url
cls = get_filesystem_class(self._url.scheme)
url_kwargs = cls._get_kwargs_from_urls(
urllib.parse.urlunparse(self._url)
)
url_kwargs.update(kwargs)
self._fs = cls(**url_kwargs)
if self._url.scheme in ["hdfs"]:
self._fs.root_marker = "/"

def argument_upath_self_to_filepath(self, func):
"""if arguments are passed to the wrapped function, and if the first
argument is a UniversalPath instance, that argument is replaced with
the UniversalPath's path attribute
"""

def wrapper(*args, **kwargs):
if args:
args = list(args)
first_arg = args.pop(0)
if not kwargs.get("path"):
if isinstance(first_arg, UniversalPath):
first_arg = first_arg.path
if not self._fs.root_marker and first_arg.startswith(
"/"
):
first_arg = first_arg[1:]
args.insert(0, first_arg)
args = tuple(args)
else:
if not self._fs.root_marker and kwargs["path"].startswith(
"/"
):
kwargs["path"] = kwargs["path"][1:]
if self._url.scheme == "hdfs":
if "trunicate" in kwargs:
kwargs.pop("trunicate")
if func.__name__ == "mkdir":
args = args[:1]

return func(*args, **kwargs)

return wrapper

def __getattribute__(self, item):
class_attrs = ["_url", "_fs"]
if item in class_attrs:
x = super().__getattribute__(item)
return x
class_methods = [
"__init__",
"__getattribute__",
"argument_upath_self_to_filepath",
]
if item in class_methods:
return lambda *args, **kwargs: getattr(_FSSpecAccessor, item)(
self, *args, **kwargs
)
if item == "__class__":
return _FSSpecAccessor
d = object.__getattribute__(self, "__dict__")
fs = d.get("_fs", None)
if fs is not None:
method = getattr(fs, item, None)
if method:
return lambda *args, **kwargs: (
self.argument_upath_self_to_filepath(method)(
*args, **kwargs
)
) # noqa: E501
else:
raise NotImplementedError(
f"{fs.protocol} filesystem has not attribute {item}"
)


class PureUniversalPath(pathlib.PurePath):
_flavour = pathlib._posix_flavour
__slots__ = ()
from upath.registry import _registry
from upath.universal_path import UniversalPath


class UPath(pathlib.Path):
Expand All @@ -101,209 +16,27 @@ def __new__(cls, *args, **kwargs):
val = kwargs.get(key)
if val:
parsed_url._replace(**{key: val})
# treat as local filesystem, return PosixPath or
if not parsed_url.scheme:
cls = (
pathlib.WindowsPath
if os.name == "nt"
else pathlib.PosixPath
)
self = cls._from_parts(args, init=False)
if not self._flavour.is_supported:
raise NotImplementedError(
"cannot instantiate %r on your system" % (cls.__name__,)
)
self._init()
else:
cls = UniversalPath
# cls._url = parsed_url
if parsed_url.scheme in _registry:
cls = _registry[parsed_url.scheme]
else:
cls = UniversalPath
kwargs["_url"] = parsed_url
new_args.insert(0, parsed_url.path)
args = tuple(new_args)

if cls is UniversalPath:
self = cls._from_parts_init(args, init=False)
else:
self = cls._from_parts(args, init=False)
if not self._flavour.is_supported:
raise NotImplementedError(
"cannot instantiate %r on your system" % (cls.__name__,)
)
if cls is UniversalPath:
self._init(*args, **kwargs)
else:
self._init()
self = cls._from_parts_init(args, init=False)
self._init(*args, **kwargs)
return self


class UniversalPath(UPath, PureUniversalPath):

__slots__ = ("_url", "_kwargs", "_closed", "fs")

not_implemented = [
"cwd",
"home",
"expanduser",
"group",
"is_mount",
"is_symlink",
"is_socket",
"is_fifo",
"is_block_device",
"is_char_device",
"lchmod",
"lstat",
"owner",
"readlink",
]

def _init(self, *args, template=None, **kwargs):
self._closed = False
if not kwargs:
kwargs = dict(**self._kwargs)
else:
self._kwargs = dict(**kwargs)
self._url = kwargs.pop("_url") if kwargs.get("_url") else None

if not self._root:
if not self._parts:
self._root = "/"
elif self._parts[0] == "/":
self._root = self._parts.pop(0)
if getattr(self, "_str", None):
delattr(self, "_str")
if template is not None:
self._accessor = template._accessor
else:
self._accessor = _FSSpecAccessor(self._url, *args, **kwargs)
self.fs = self._accessor._fs

def __getattribute__(self, item):
if item == "__class__":
return UniversalPath
if item in getattr(UniversalPath, "not_implemented"):
raise NotImplementedError(f"UniversalPath has no attribute {item}")
else:
return super().__getattribute__(item)

def _format_parsed_parts(self, drv, root, parts):
join_parts = parts[1:] if parts[0] == "/" else parts
if drv or root:
path = drv + root + self._flavour.join(join_parts)
else:
path = self._flavour.join(join_parts)
scheme, netloc = self._url.scheme, self._url.netloc
scheme = scheme + ":"
netloc = "//" + netloc if netloc else ""
formatted = scheme + netloc + path
return formatted

@property
def path(self):
if self._parts:
join_parts = (
self._parts[1:] if self._parts[0] == "/" else self._parts
)
path = self._flavour.join(join_parts)
return self._root + path
else:
return "/"

def open(self, *args, **kwargs):
return self._accessor.open(self, *args, **kwargs)

def iterdir(self):
"""Iterate over the files in this directory. Does not yield any
result for the special paths '.' and '..'.
"""
if self._closed:
self._raise_closed()
for name in self._accessor.listdir(self):
# fsspec returns dictionaries
if isinstance(name, dict):
name = name.get("name")
if name in {".", ".."}:
# Yielding a path object for these makes little sense
continue
# only want the path name with iterdir
sp = self.path
name = re.sub(f"^({sp}|{sp[1:]})/", "", name)
yield self._make_child_relpath(name)
if self._closed:
self._raise_closed()

def exists(self):
"""
Whether this path exists.
"""
if not getattr(self._accessor, "exists"):
try:
self._accessor.stat(self)
except (FileNotFoundError):
return False
return True
else:
return self._accessor.exists(self)

def is_dir(self):
info = self._accessor.info(self)
if info["type"] == "directory":
return True
return False

def is_file(self):
info = self._accessor.info(self)
if info["type"] == "file":
return True
return False

def glob(self, pattern):
path = self.joinpath(pattern)
for name in self._accessor.glob(self, path=path.path):
sp = self.path
name = re.sub(f"^({sp}|{sp[1:]})/", "", name)
name = name.split(self._flavour.sep)
yield self._make_child(self._parts + name)

def rename(self, target):
# can be implimented, but may be tricky
raise NotImplementedError

def touch(self, trunicate=True, **kwargs):
self._accessor.touch(self, trunicate=trunicate, **kwargs)

def unlink(self, missing_ok=False):
if not self.exists():
if not missing_ok:
raise FileNotFoundError
else:
return
self._accessor.rm(self, recursive=False)

def rmdir(self, recursive=True):
"""Add warning if directory not empty
assert is_dir?
"""
try:
assert self.is_dir()
except AssertionError:
raise NotDirectoryError
self._accessor.rm(self, recursive=recursive)

@classmethod
def _from_parts_init(cls, args, init=False):
return super()._from_parts(args, init=init)

def _from_parts(self, args, init=True):
# We need to call _parse_args on the instance, so as to get the
# right flavour.
obj = object.__new__(UniversalPath)
drv, root, parts = self._parse_args(args)
obj._drv = drv
obj._root = root
obj._parts = parts
if init:
obj._init(**self._kwargs)
return obj

def _from_parsed_parts(self, drv, root, parts, init=True):
obj = object.__new__(UniversalPath)
obj._drv = drv
obj._root = root
obj._parts = parts
if init:
obj._init(**self._kwargs)
return obj
Empty file.
35 changes: 35 additions & 0 deletions upath/implementations/http.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import urllib

from upath.universal_path import UniversalPath, _FSSpecAccessor


class _HTTPAccessor(_FSSpecAccessor):
def __init__(self, parsed_url, *args, **kwargs):
super().__init__(parsed_url, *args, **kwargs)

def argument_upath_self_to_filepath(self, func):
"""if arguments are passed to the wrapped function, and if the first
argument is a UniversalPath instance, that argument is replaced with
the UniversalPath's path attribute
"""

def wrapper(*args, **kwargs):
if args:
args = list(args)
first_arg = args.pop(0)
if not kwargs.get("path"):
if isinstance(first_arg, UniversalPath):
first_arg = str(first_arg)
args.insert(0, first_arg)
args = tuple(args)
else:
new_url = self._url.replace(path=kwargs["path"])
unparsed = urllib.urlunparse(new_url)
kwargs["path"] = unparsed
return func(*args, **kwargs)

return wrapper


class HTTPPath(UniversalPath):
_default_accessor = _HTTPAccessor
3 changes: 3 additions & 0 deletions upath/registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from upath.implementations import http

_registry = {"http": http.HTTPPath}
Empty file.
10 changes: 10 additions & 0 deletions upath/tests/implementations/test_http.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import pytest # noqa: F401

from upath import UPath
from upath.implementations.http import HTTPPath


def test_httppath():
path = UPath("http://example.com")
assert isinstance(path, HTTPPath)
assert path.exists()
1 change: 0 additions & 1 deletion upath/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ def test_home(self):

def test_stat(self):
stat = self.path.stat()
print(stat)
assert stat

def test_chmod(self):
Expand Down
Loading