Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-3160: [Python] Improve pathlib.Path support in parquet and filesystem modules #2506

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
77 changes: 70 additions & 7 deletions python/pyarrow/filesystem.py
Expand Up @@ -15,11 +15,15 @@
# specific language governing permissions and limitations
# under the License.

from os.path import join as pjoin
import os
import inspect
import posixpath

from pyarrow.util import implements
from os.path import join as pjoin
from six.moves.urllib.parse import urlparse

import pyarrow as pa
from pyarrow.util import implements, _stringify_path


class FileSystem(object):
Expand Down Expand Up @@ -68,6 +72,7 @@ def disk_usage(self, path):
-------
usage : int
"""
path = _stringify_path(path)
path_info = self.stat(path)
if path_info['kind'] == 'file':
return path_info['size']
Expand Down Expand Up @@ -199,21 +204,25 @@ def get_instance(cls):

@implements(FileSystem.ls)
def ls(self, path):
path = _stringify_path(path)
return sorted(pjoin(path, x) for x in os.listdir(path))

@implements(FileSystem.mkdir)
def mkdir(self, path, create_parents=True):
path = _stringify_path(path)
if create_parents:
os.makedirs(path)
else:
os.mkdir(path)

@implements(FileSystem.isdir)
def isdir(self, path):
path = _stringify_path(path)
return os.path.isdir(path)

@implements(FileSystem.isfile)
def isfile(self, path):
path = _stringify_path(path)
return os.path.isfile(path)

@implements(FileSystem._isfilestore)
Expand All @@ -222,24 +231,27 @@ def _isfilestore(self):

@implements(FileSystem.exists)
def exists(self, path):
path = _stringify_path(path)
return os.path.exists(path)

@implements(FileSystem.open)
def open(self, path, mode='rb'):
"""
Open file for reading or writing
"""
path = _stringify_path(path)
return open(path, mode=mode)

@property
def pathsep(self):
return os.path.sep

def walk(self, top_dir):
def walk(self, path):
"""
Directory tree generator, see os.walk
"""
return os.walk(top_dir)
path = _stringify_path(path)
return os.walk(path)


class DaskFileSystem(FileSystem):
Expand Down Expand Up @@ -268,14 +280,17 @@ def _isfilestore(self):

@implements(FileSystem.delete)
def delete(self, path, recursive=False):
path = _stringify_path(path)
return self.fs.rm(path, recursive=recursive)

@implements(FileSystem.exists)
def exists(self, path):
path = _stringify_path(path)
return self.fs.exists(path)

@implements(FileSystem.mkdir)
def mkdir(self, path, create_parents=True):
path = _stringify_path(path)
if create_parents:
return self.fs.mkdirs(path)
else:
Expand All @@ -286,22 +301,26 @@ def open(self, path, mode='rb'):
"""
Open file for reading or writing
"""
path = _stringify_path(path)
return self.fs.open(path, mode=mode)

def ls(self, path, detail=False):
path = _stringify_path(path)
return self.fs.ls(path, detail=detail)

def walk(self, top_path):
def walk(self, path):
"""
Directory tree generator, like os.walk
"""
return self.fs.walk(top_path)
path = _stringify_path(path)
return self.fs.walk(path)


class S3FSWrapper(DaskFileSystem):

@implements(FileSystem.isdir)
def isdir(self, path):
path = _stringify_path(path)
try:
contents = self.fs.ls(path)
if len(contents) == 1 and contents[0] == path:
Expand All @@ -313,6 +332,7 @@ def isdir(self, path):

@implements(FileSystem.isfile)
def isfile(self, path):
path = _stringify_path(path)
try:
contents = self.fs.ls(path)
return len(contents) == 1 and contents[0] == path
Expand All @@ -326,7 +346,7 @@ def walk(self, path, refresh=False):
Generator version of what is in s3fs, which yields a flattened list of
files
"""
path = path.replace('s3://', '')
path = _stringify_path(path).replace('s3://', '')
directories = set()
files = set()

Expand All @@ -350,3 +370,46 @@ def walk(self, path, refresh=False):
for directory in directories:
for tup in self.walk(directory, refresh=refresh):
yield tup


def _ensure_filesystem(fs):
fs_type = type(fs)

# If the arrow filesystem was subclassed, assume it supports the full
# interface and return it
if not issubclass(fs_type, FileSystem):
for mro in inspect.getmro(fs_type):
if mro.__name__ is 'S3FileSystem':
return S3FSWrapper(fs)
# In case its a simple LocalFileSystem (e.g. dask) use native arrow
# FS
elif mro.__name__ is 'LocalFileSystem':
return LocalFileSystem.get_instance()

raise IOError('Unrecognized filesystem: {0}'.format(fs_type))
else:
return fs


def _get_fs_from_path(path):
"""
return filesystem from path which could be an HDFS URI
"""
# input can be hdfs URI such as hdfs://host:port/myfile.parquet
path = _stringify_path(path)
# if _has_pathlib and isinstance(path, pathlib.Path):
# path = str(path)
parsed_uri = urlparse(path)
if parsed_uri.scheme == 'hdfs':
netloc_split = parsed_uri.netloc.split(':')
host = netloc_split[0]
if host == '':
host = 'default'
port = 0
if len(netloc_split) == 2 and netloc_split[1].isnumeric():
port = int(netloc_split[1])
fs = pa.hdfs.connect(host=host, port=port)
else:
fs = LocalFileSystem.get_instance()

return fs
18 changes: 4 additions & 14 deletions python/pyarrow/io.pxi
Expand Up @@ -19,15 +19,17 @@
# arrow::ipc

from libc.stdlib cimport malloc, free
from pyarrow.compat import builtin_pickle, frombytes, tobytes, encode_file_path
from io import BufferedIOBase, UnsupportedOperation

import re
import six
import sys
import threading
import time
import warnings
from io import BufferedIOBase, UnsupportedOperation

from pyarrow.util import _stringify_path
from pyarrow.compat import builtin_pickle, frombytes, tobytes, encode_file_path


# 64K
Expand All @@ -40,18 +42,6 @@ cdef extern from "Python.h":
char *v, Py_ssize_t len) except NULL


def _stringify_path(path):
"""
Convert *path* to a string or unicode path if possible.
"""
if isinstance(path, six.string_types):
return path
try:
return path.__fspath__()
except AttributeError:
raise TypeError("not a path-like object")


cdef class NativeFile:

def __cinit__(self):
Expand Down