Skip to content

Commit

Permalink
pg.io.FileSystem to support os.PathLike objects as path.
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 584387134
  • Loading branch information
daiyip authored and pyglove authors committed Nov 21, 2023
1 parent b367568 commit 3938023
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 47 deletions.
137 changes: 91 additions & 46 deletions pyglove/core/io/file_system.py
Expand Up @@ -67,42 +67,60 @@ class FileSystem(metaclass=abc.ABCMeta):
"""Interface for a file system."""

@abc.abstractmethod
def open(self, path: str, mode: str = 'r', **kwargs) -> File:
def open(
self, path: Union[str, os.PathLike[str]], mode: str = 'r', **kwargs
) -> File:
"""Opens a file with a path."""

@abc.abstractmethod
def exists(self, path: str) -> bool:
def exists(self, path: Union[str, os.PathLike[str]]) -> bool:
"""Returns True if a path exists."""

@abc.abstractmethod
def listdir(self, path: str) -> list[str]:
def listdir(self, path: Union[str, os.PathLike[str]]) -> list[str]:
"""Lists all files or sub-directories."""

@abc.abstractmethod
def isdir(self, path: str) -> bool:
def isdir(self, path: Union[str, os.PathLike[str]]) -> bool:
"""Returns True if a path is a directory."""

@abc.abstractmethod
def mkdir(self, path: str, mode: int = 0o777) -> None:
def mkdir(
self, path: Union[str, os.PathLike[str]], mode: int = 0o777
) -> None:
"""Makes a directory based on a path."""

@abc.abstractmethod
def mkdirs(
self, path: str, mode: int = 0o777, exist_ok: bool = False) -> None:
self,
path: Union[str, os.PathLike[str]],
mode: int = 0o777,
exist_ok: bool = False,
) -> None:
"""Makes a directory chain based on a path."""

@abc.abstractmethod
def rm(self, path: str) -> None:
def rm(self, path: Union[str, os.PathLike[str]]) -> None:
"""Removes a file based on a path."""

@abc.abstractmethod
def rmdir(self, path: str) -> bool:
def rmdir(self, path: Union[str, os.PathLike[str]]) -> bool:
"""Removes a directory based on a path."""

@abc.abstractmethod
def rmdirs(self, path: str) -> None:
def rmdirs(self, path: Union[str, os.PathLike[str]]) -> None:
"""Removes a directory chain based on a path."""


def _resolve_path(path: Union[str, os.PathLike[str]]) -> str:
if isinstance(path, str):
return path
elif hasattr(path, '__fspath__'):
return path.__fspath__()
else:
raise ValueError(f'Unsupported path: {path!r}.')


#
# The standard file system.
#
Expand Down Expand Up @@ -137,32 +155,40 @@ def close(self) -> None:
class StdFileSystem(FileSystem):
"""The standard file system."""

def open(self, path: str, mode: str = 'r', **kwargs) -> File:
def open(
self, path: Union[str, os.PathLike[str]], mode: str = 'r', **kwargs
) -> File:
return StdFile(io.open(path, mode, **kwargs))

def exists(self, path: str) -> bool:
def exists(self, path: Union[str, os.PathLike[str]]) -> bool:
return os.path.exists(path)

def listdir(self, path: str) -> list[str]:
def listdir(self, path: Union[str, os.PathLike[str]]) -> list[str]:
return os.listdir(path)

def isdir(self, path: str) -> bool:
def isdir(self, path: Union[str, os.PathLike[str]]) -> bool:
return os.path.isdir(path)

def mkdir(self, path: str, mode: int = 0o777) -> None:
def mkdir(
self, path: Union[str, os.PathLike[str]], mode: int = 0o777
) -> None:
os.mkdir(path, mode)

def mkdirs(
self, path: str, mode: int = 0o777, exist_ok: bool = False) -> None:
self,
path: Union[str, os.PathLike[str]],
mode: int = 0o777,
exist_ok: bool = False,
) -> None:
os.makedirs(path, mode, exist_ok)

def rm(self, path: str) -> None:
def rm(self, path: Union[str, os.PathLike[str]]) -> None:
os.remove(path)

def rmdir(self, path: str) -> None:
def rmdir(self, path: Union[str, os.PathLike[str]]) -> None:
os.rmdir(path)

def rmdirs(self, path: str) -> None:
def rmdirs(self, path: Union[str, os.PathLike[str]]) -> None:
os.removedirs(path)


Expand Down Expand Up @@ -206,10 +232,10 @@ def __init__(self, prefix: str = '/mem/'):
self._root = {}
self._prefix = prefix

def _internal_path(self, path: str) -> str:
return '/' + path.lstrip(self._prefix)
def _internal_path(self, path: Union[str, os.PathLike[str]]) -> str:
return '/' + _resolve_path(path).lstrip(self._prefix)

def _locate(self, path: str) -> Any:
def _locate(self, path: Union[str, os.PathLike[str]]) -> Any:
current = self._root
for x in self._internal_path(path).split('/'):
if not x:
Expand All @@ -219,7 +245,9 @@ def _locate(self, path: str) -> Any:
current = current[x]
return current

def open(self, path: str, mode: str = 'r', **kwargs) -> File:
def open(
self, path: Union[str, os.PathLike[str]], mode: str = 'r', **kwargs
) -> File:
file = self._locate(path)
if isinstance(file, dict):
raise IsADirectoryError(path)
Expand All @@ -234,19 +262,22 @@ def open(self, path: str, mode: str = 'r', **kwargs) -> File:
raise FileNotFoundError(path)
return file

def exists(self, path: str) -> bool:
def exists(self, path: Union[str, os.PathLike[str]]) -> bool:
return self._locate(path) is not None

def listdir(self, path: str) -> list[str]:
def listdir(self, path: Union[str, os.PathLike[str]]) -> list[str]:
d = self._locate(path)
if not isinstance(d, dict):
raise FileNotFoundError(path)
return list(d.keys())

def isdir(self, path: str) -> bool:
def isdir(self, path: Union[str, os.PathLike[str]]) -> bool:
return isinstance(self._locate(path), dict)

def _parent_and_name(self, path: str) -> tuple[dict[str, Any], str]:
def _parent_and_name(
self, path: Union[str, os.PathLike[str]]
) -> tuple[dict[str, Any], str]:
path = _resolve_path(path)
rpos = path.rfind('/')
assert rpos >= 0, path
name = path[rpos + 1:]
Expand All @@ -255,15 +286,21 @@ def _parent_and_name(self, path: str) -> tuple[dict[str, Any], str]:
raise FileNotFoundError(path)
return parent_dir, name

def mkdir(self, path: str, mode: int = 0o777) -> None:
def mkdir(
self, path: Union[str, os.PathLike[str]], mode: int = 0o777
) -> None:
del mode
parent_dir, name = self._parent_and_name(path)
if name in parent_dir:
raise FileExistsError(path)
parent_dir[name] = {}

def mkdirs(
self, path: str, mode: int = 0o777, exist_ok: bool = False) -> None:
self,
path: Union[str, os.PathLike[str]],
mode: int = 0o777,
exist_ok: bool = False,
) -> None:
del mode
current = self._root
dirs = self._internal_path(path).split('/')
Expand All @@ -280,7 +317,7 @@ def mkdirs(
raise NotADirectoryError(path)
current = entry

def rm(self, path: str) -> None:
def rm(self, path: Union[str, os.PathLike[str]]) -> None:
parent_dir, name = self._parent_and_name(path)
entry = parent_dir.get(name)
if entry is None:
Expand All @@ -289,7 +326,7 @@ def rm(self, path: str) -> None:
raise IsADirectoryError(path)
del parent_dir[name]

def rmdir(self, path: str) -> None:
def rmdir(self, path: Union[str, os.PathLike[str]]) -> None:
parent_dir, name = self._parent_and_name(path)
entry = parent_dir.get(name)
if entry is None:
Expand All @@ -300,7 +337,7 @@ def rmdir(self, path: str) -> None:
raise OSError(f'Directory not empty: {path!r}')
del parent_dir[name]

def rmdirs(self, path: str) -> None:
def rmdirs(self, path: Union[str, os.PathLike[str]]) -> None:
def _rmdir(dir_dict, subpath: str) -> bool:
if not subpath:
if dir_dict:
Expand Down Expand Up @@ -333,9 +370,9 @@ def add(self, prefix, fs: FileSystem) -> None:
self._filesystems.append((prefix, fs))
self._filesystems.sort(key=lambda x: x[0], reverse=True)

def get(self, path: str) -> FileSystem:
def get(self, path: Union[str, os.PathLike[str]]) -> FileSystem:
"""Gets the file system for a path."""
path = path.lower()
path = _resolve_path(path)
for prefix, fs in self._filesystems:
if path.startswith(prefix):
return fs
Expand All @@ -359,16 +396,17 @@ def add_file_system(prefix: str, fs: FileSystem) -> None:
#


def open(path: str, mode: str = 'r', **kwargs) -> File: # pylint:disable=redefined-builtin
def open(path: Union[str, os.PathLike[str]], mode: str = 'r', **kwargs) -> File: # pylint:disable=redefined-builtin
"""Opens a file with a path."""
return _fs.get(path).open(path, mode, **kwargs)


def readfile(
path: str,
path: Union[str, os.PathLike[str]],
mode: str = 'r',
nonexist_ok: bool = False,
**kwargs) -> Union[bytes, str, None]:
**kwargs,
) -> Union[bytes, str, None]:
"""Reads content from a file."""
try:
with _fs.get(path).open(path, mode=mode, **kwargs) as f:
Expand All @@ -380,54 +418,61 @@ def readfile(


def writefile(
path: str,
path: Union[str, os.PathLike[str]],
content: Union[str, bytes],
*,
mode: str = 'w',
**kwargs) -> None:
**kwargs,
) -> None:
"""Writes content to a file."""
with _fs.get(path).open(path, mode=mode, **kwargs) as f:
f.write(content)


def rm(path: str) -> None:
def rm(path: Union[str, os.PathLike[str]]) -> None:
"""Removes a file."""
_fs.get(path).rm(path)


def path_exists(path: str) -> bool:
def path_exists(path: Union[str, os.PathLike[str]]) -> bool:
"""Returns True if path exists."""
return _fs.get(path).exists(path)


def listdir(path: str, fullpath: bool = False) -> list[str]: # pylint: disable=redefined-builtin
def listdir(
path: Union[str, os.PathLike[str]], fullpath: bool = False
) -> list[str]: # pylint: disable=redefined-builtin
"""Lists all files or sub-directories under a dir."""
entries = _fs.get(path).listdir(path)
if fullpath:
return [os.path.join(path, entry) for entry in entries]
return entries


def isdir(path: str) -> bool:
def isdir(path: Union[str, os.PathLike[str]]) -> bool:
"""Returns True if path is a directory."""
return _fs.get(path).isdir(path)


def mkdir(path: str, mode: int = 0o777) -> None:
def mkdir(path: Union[str, os.PathLike[str]], mode: int = 0o777) -> None:
"""Makes a directory."""
_fs.get(path).mkdir(path, mode=mode)


def mkdirs(path: str, mode: int = 0o777, exist_ok: bool = False) -> None:
def mkdirs(
path: Union[str, os.PathLike[str]],
mode: int = 0o777,
exist_ok: bool = False,
) -> None:
"""Makes a directory chain."""
_fs.get(path).mkdirs(path, mode=mode, exist_ok=exist_ok)


def rmdir(path: str) -> bool:
def rmdir(path: Union[str, os.PathLike[str]]) -> bool:
"""Removes a directory."""
return _fs.get(path).rmdir(path)


def rmdirs(path: str) -> bool:
def rmdirs(path: Union[str, os.PathLike[str]]) -> bool:
"""Removes a directory chain until a parent directory is not empty."""
return _fs.get(path).rmdirs(path)
6 changes: 5 additions & 1 deletion pyglove/core/io/file_system_test.py
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import os
import pathlib
import tempfile
import unittest
from pyglove.core.io import file_system
Expand Down Expand Up @@ -91,6 +92,9 @@ def test_file(self):
with self.assertRaises(FileNotFoundError):
fs.open(file1)

with self.assertRaisesRegex(ValueError, 'Unsupported path'):
fs.open(1)

with fs.open(file1, 'w') as f:
f.write('hello\npyglove')

Expand Down Expand Up @@ -213,7 +217,7 @@ def test_standard_filesystem(self):
self.assertFalse(file_system.path_exists(file2))

def test_memory_filesystem(self):
file1 = '/mem/file1'
file1 = pathlib.Path('/mem/file1')
with self.assertRaises(FileNotFoundError):
file_system.readfile(file1)
self.assertIsNone(file_system.readfile(file1, nonexist_ok=True))
Expand Down

0 comments on commit 3938023

Please sign in to comment.