Skip to content

Commit

Permalink
reorganizte utility functions in parquet and filasystem modules; supp…
Browse files Browse the repository at this point in the history
…ort pathlib.Path in Filesystem API
  • Loading branch information
kszucs committed Sep 4, 2018
1 parent 583f21a commit 7e56bb5
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 92 deletions.
102 changes: 96 additions & 6 deletions python/pyarrow/filesystem.py
Expand Up @@ -15,10 +15,26 @@
# specific language governing permissions and limitations
# under the License.

from os.path import join as pjoin
import os
import six
import inspect
import posixpath

from os.path import join as pjoin
from six.moves.urllib.parse import urlparse

try:
# pathlib might not be available
try:
import pathlib
except ImportError:
# python 2 backport
import pathlib2 as pathlib
_has_pathlib = True
except ImportError:
_has_pathlib = False

import pyarrow as pa
from pyarrow.util import implements


Expand Down Expand Up @@ -68,6 +84,7 @@ def disk_usage(self, path):
-------
usage : int
"""
path = _ensure_str_path(path)
path_info = self.stat(path)
if path_info['kind'] == 'file':
return path_info['size']
Expand Down Expand Up @@ -199,21 +216,25 @@ def get_instance(cls):

@implements(FileSystem.ls)
def ls(self, path):
path = _ensure_str_path(path)
return sorted(pjoin(path, x) for x in os.listdir(path))

@implements(FileSystem.mkdir)
def mkdir(self, path, create_parents=True):
path = _ensure_str_path(path)
if create_parents:
os.makedirs(path)
else:
os.mkdir(path)

@implements(FileSystem.isdir)
def isdir(self, path):
path = _ensure_str_path(path)
return os.path.isdir(path)

@implements(FileSystem.isfile)
def isfile(self, path):
path = _ensure_str_path(path)
return os.path.isfile(path)

@implements(FileSystem._isfilestore)
Expand All @@ -222,24 +243,27 @@ def _isfilestore(self):

@implements(FileSystem.exists)
def exists(self, path):
path = _ensure_str_path(path)
return os.path.exists(path)

@implements(FileSystem.open)
def open(self, path, mode='rb'):
"""
Open file for reading or writing
"""
path = _ensure_str_path(path)
return open(path, mode=mode)

@property
def pathsep(self):
return os.path.sep

def walk(self, top_dir):
def walk(self, path):
"""
Directory tree generator, see os.walk
"""
return os.walk(top_dir)
path = _ensure_str_path(path)
return os.walk(path)


class DaskFileSystem(FileSystem):
Expand Down Expand Up @@ -268,14 +292,17 @@ def _isfilestore(self):

@implements(FileSystem.delete)
def delete(self, path, recursive=False):
path = _ensure_str_path(path)
return self.fs.rm(path, recursive=recursive)

@implements(FileSystem.exists)
def exists(self, path):
path = _ensure_str_path(path)
return self.fs.exists(path)

@implements(FileSystem.mkdir)
def mkdir(self, path, create_parents=True):
path = _ensure_str_path(path)
if create_parents:
return self.fs.mkdirs(path)
else:
Expand All @@ -286,22 +313,26 @@ def open(self, path, mode='rb'):
"""
Open file for reading or writing
"""
path = _ensure_str_path(path)
return self.fs.open(path, mode=mode)

def ls(self, path, detail=False):
path = _ensure_str_path(path)
return self.fs.ls(path, detail=detail)

def walk(self, top_path):
def walk(self, path):
"""
Directory tree generator, like os.walk
"""
return self.fs.walk(top_path)
path = _ensure_str_path(path)
return self.fs.walk(path)


class S3FSWrapper(DaskFileSystem):

@implements(FileSystem.isdir)
def isdir(self, path):
path = _ensure_str_path(path)
try:
contents = self.fs.ls(path)
if len(contents) == 1 and contents[0] == path:
Expand All @@ -313,6 +344,7 @@ def isdir(self, path):

@implements(FileSystem.isfile)
def isfile(self, path):
path = _ensure_str_path(path)
try:
contents = self.fs.ls(path)
return len(contents) == 1 and contents[0] == path
Expand All @@ -326,7 +358,7 @@ def walk(self, path, refresh=False):
Generator version of what is in s3fs, which yields a flattened list of
files
"""
path = path.replace('s3://', '')
path = _ensure_str_path(path).replace('s3://', '')
directories = set()
files = set()

Expand All @@ -350,3 +382,61 @@ def walk(self, path, refresh=False):
for directory in directories:
for tup in self.walk(directory, refresh=refresh):
yield tup


def is_path(x):
return (isinstance(x, six.string_types)
or (_has_pathlib and isinstance(x, pathlib.Path)))


def _ensure_str_path(path):
if isinstance(path, six.string_types):
return path
elif _has_pathlib and isinstance(path, pathlib.Path):
return str(path)
else:
raise TypeError('Path must be a string or an instance of pathlib.Path,'
' got object with type: {}'.format(type(path)))


def _ensure_filesystem(fs):
fs_type = type(fs)

# If the arrow filesystem was subclassed, assume it supports the full
# interface and return it
if not issubclass(fs_type, FileSystem):
for mro in inspect.getmro(fs_type):
if mro.__name__ is 'S3FileSystem':
return S3FSWrapper(fs)
# In case its a simple LocalFileSystem (e.g. dask) use native arrow
# FS
elif mro.__name__ is 'LocalFileSystem':
return LocalFileSystem.get_instance()

raise IOError('Unrecognized filesystem: {0}'.format(fs_type))
else:
return fs


def _get_fs_from_path(path):
"""
return filesystem from path which could be an HDFS URI
"""
# input can be hdfs URI such as hdfs://host:port/myfile.parquet
path = _ensure_str_path(path)
# if _has_pathlib and isinstance(path, pathlib.Path):
# path = str(path)
parsed_uri = urlparse(path)
if parsed_uri.scheme == 'hdfs':
netloc_split = parsed_uri.netloc.split(':')
host = netloc_split[0]
if host == '':
host = 'default'
port = 0
if len(netloc_split) == 2 and netloc_split[1].isnumeric():
port = int(netloc_split[1])
fs = pa.hdfs.connect(host=host, port=port)
else:
fs = LocalFileSystem.get_instance()

return fs
70 changes: 3 additions & 67 deletions python/pyarrow/parquet.py
Expand Up @@ -18,21 +18,13 @@
from collections import defaultdict
from concurrent import futures
import os
import inspect
import json
import re
import six
from six.moves.urllib.parse import urlparse
# pathlib might not be available in Python 2
try:
import pathlib
_has_pathlib = True
except ImportError:
_has_pathlib = False

import numpy as np

from pyarrow.filesystem import FileSystem, LocalFileSystem, S3FSWrapper
from pyarrow.filesystem import (LocalFileSystem, is_path, _ensure_str_path,
_ensure_filesystem, _get_fs_from_path)
from pyarrow._parquet import (ParquetReader, RowGroupStatistics, # noqa
FileMetaData, RowGroupMetaData,
ColumnChunkMetaData,
Expand Down Expand Up @@ -634,11 +626,6 @@ def filter_accepts_partition(self, part_key, filter, level):
filter[1])


def is_path(x):
return (isinstance(x, six.string_types)
or (_has_pathlib and isinstance(x, pathlib.Path)))


class ParquetManifest(object):
"""
Expand Down Expand Up @@ -957,34 +944,6 @@ def all_filters_accept(piece):
self.pieces = [p for p in self.pieces if all_filters_accept(p)]


def _ensure_filesystem(fs):
fs_type = type(fs)

# If the arrow filesystem was subclassed, assume it supports the full
# interface and return it
if not issubclass(fs_type, FileSystem):
for mro in inspect.getmro(fs_type):
if mro.__name__ is 'S3FileSystem':
return S3FSWrapper(fs)
# In case its a simple LocalFileSystem (e.g. dask) use native arrow
# FS
elif mro.__name__ is 'LocalFileSystem':
return LocalFileSystem.get_instance()

raise IOError('Unrecognized filesystem: {0}'.format(fs_type))
else:
return fs


def _ensure_str_path(path):
if isinstance(path, six.string_types):
return path
elif _has_pathlib and isinstance(path, pathlib.Path):
return str(path)
else:
raise TypeError('Path must be a string or an instance of pathlib.Path')


def _make_manifest(path_or_paths, fs, pathsep='/', metadata_nthreads=1):
partitions = None
common_metadata_path = None
Expand Down Expand Up @@ -1103,7 +1062,7 @@ def write_table(table, where, row_group_size=None, version='1.0',
except Exception:
if is_path(where):
try:
os.remove(where)
os.remove(_ensure_str_path(where))
except os.error:
pass
raise
Expand Down Expand Up @@ -1267,26 +1226,3 @@ def read_schema(where):
schema : pyarrow.Schema
"""
return ParquetFile(where).schema.to_arrow_schema()


def _get_fs_from_path(path):
"""
return filesystem from path which could be an HDFS URI
"""
# input can be hdfs URI such as hdfs://host:port/myfile.parquet
if _has_pathlib and isinstance(path, pathlib.Path):
path = str(path)
parsed_uri = urlparse(path)
if parsed_uri.scheme == 'hdfs':
netloc_split = parsed_uri.netloc.split(':')
host = netloc_split[0]
if host == '':
host = 'default'
port = 0
if len(netloc_split) == 2 and netloc_split[1].isnumeric():
port = int(netloc_split[1])
fs = pa.hdfs.connect(host=host, port=port)
else:
fs = LocalFileSystem.get_instance()

return fs

0 comments on commit 7e56bb5

Please sign in to comment.