Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions python/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,9 @@ coverage.xml
# benchmark working dir
.asv
pyarrow/_table_api.h

# manylinux1 temporary files
manylinux1/arrow

# plasma store
pyarrow/plasma_store
10 changes: 10 additions & 0 deletions python/doc/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,16 @@ Input / Output and Shared Memory
create_memory_map
PythonFile

Filesystems
-----------

.. autosummary::
:toctree: generated/

hdfs.connect
HadoopFilesystem
LocalFilesystem

.. _api.ipc:

Interprocess Communication and Messaging
Expand Down
15 changes: 8 additions & 7 deletions python/doc/source/filesystems.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,13 @@ System. You connect like so:
.. code-block:: python

import pyarrow as pa
hdfs = pa.HdfsClient(host, port, user=user, kerb_ticket=ticket_cache_path)
hdfs = pa.hdfs.connect(host, port, user=user, kerb_ticket=ticket_cache_path)
type(hdfs)

By default, ``pyarrow.HdfsClient`` uses libhdfs, a JNI-based interface to the
Java Hadoop client. This library is loaded **at runtime** (rather than at link
/ library load time, since the library may not be in your LD_LIBRARY_PATH), and
relies on some environment variables.
By default, ``pyarrow.hdfs.HadoopFilesystem`` uses libhdfs, a JNI-based
interface to the Java Hadoop client. This library is loaded **at runtime**
(rather than at link / library load time, since the library may not be in your
LD_LIBRARY_PATH), and relies on some environment variables.

* ``HADOOP_HOME``: the root of your installed Hadoop distribution. Often has
`lib/native/libhdfs.so`.
Expand All @@ -56,5 +57,5 @@ You can also use libhdfs3, a thirdparty C++ library for HDFS from Pivotal Labs:

.. code-block:: python

hdfs3 = pa.HdfsClient(host, port, user=user, kerb_ticket=ticket_cache_path,
driver='libhdfs3')
hdfs3 = pa.hdfs.connect(host, port, user=user, kerb_ticket=ticket_cache_path,
driver='libhdfs3')
20 changes: 9 additions & 11 deletions python/pyarrow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,10 @@
ArrowTypeError)


from pyarrow.filesystem import Filesystem, HdfsClient, LocalFilesystem
from pyarrow.filesystem import Filesystem, LocalFilesystem

from pyarrow.hdfs import HadoopFilesystem
import pyarrow.hdfs as hdfs

from pyarrow.ipc import (Message, MessageReader,
RecordBatchFileReader, RecordBatchFileWriter,
Expand All @@ -106,16 +109,7 @@
# ----------------------------------------------------------------------
# 0.4.0 deprecations

import warnings

def _deprecate_class(old_name, new_name, klass, next_version='0.5.0'):
msg = ('pyarrow.{0} has been renamed to '
'{1}, will be removed in {2}'
.format(old_name, new_name, next_version))
def deprecated_factory(*args, **kwargs):
warnings.warn(msg, FutureWarning)
return klass(*args)
return deprecated_factory
from pyarrow.util import _deprecate_class

FileReader = _deprecate_class('FileReader',
'RecordBatchFileReader',
Expand All @@ -136,3 +130,7 @@ def deprecated_factory(*args, **kwargs):
InMemoryOutputStream = _deprecate_class('InMemoryOutputStream',
'BufferOutputStream',
BufferOutputStream, '0.5.0')

# Backwards compatibility with pyarrow < 0.6.0
HdfsClient = _deprecate_class('HdfsClient', 'pyarrow.hdfs.connect',
hdfs.connect, '0.6.0')
140 changes: 94 additions & 46 deletions python/pyarrow/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

from os.path import join as pjoin
import os
import posixpath

from pyarrow.util import implements
import pyarrow.lib as lib


class Filesystem(object):
Expand All @@ -44,6 +44,12 @@ def delete(self, path, recursive=False):
"""
raise NotImplementedError

def rm(self, path, recursive=False):
"""
Alias for Filesystem.delete
"""
return self.delete(path, recursive=recursive)

def mkdir(self, path, create_parents=True):
raise NotImplementedError

Expand Down Expand Up @@ -96,6 +102,12 @@ def read_parquet(self, path, columns=None, metadata=None, schema=None,
return dataset.read(columns=columns, nthreads=nthreads,
use_pandas_metadata=use_pandas_metadata)

def open(self, path, mode='rb'):
"""
Open file for reading or writing
"""
raise NotImplementedError

@property
def pathsep(self):
return '/'
Expand Down Expand Up @@ -134,6 +146,7 @@ def isfile(self, path):
def exists(self, path):
return os.path.exists(path)

@implements(Filesystem.open)
def open(self, path, mode='rb'):
"""
Open file for reading or writing
Expand All @@ -144,68 +157,103 @@ def open(self, path, mode='rb'):
def pathsep(self):
return os.path.sep

def walk(self, top_dir):
"""
Directory tree generator, see os.walk
"""
return os.walk(top_dir)

class HdfsClient(lib._HdfsClient, Filesystem):

class DaskFilesystem(Filesystem):
"""
Connect to an HDFS cluster. All parameters are optional and should
only be set if the defaults need to be overridden.

Authentication should be automatic if the HDFS cluster uses Kerberos.
However, if a username is specified, then the ticket cache will likely
be required.

Parameters
----------
host : NameNode. Set to "default" for fs.defaultFS from core-site.xml.
port : NameNode's port. Set to 0 for default or logical (HA) nodes.
user : Username when connecting to HDFS; None implies login user.
kerb_ticket : Path to Kerberos ticket cache.
driver : {'libhdfs', 'libhdfs3'}, default 'libhdfs'
Connect using libhdfs (JNI-based) or libhdfs3 (3rd-party C++
library from Pivotal Labs)

Notes
-----
The first time you call this method, it will take longer than usual due
to JNI spin-up time.

Returns
-------
client : HDFSClient
Wraps s3fs Dask filesystem implementation like s3fs, gcsfs, etc.
"""

def __init__(self, host="default", port=0, user=None, kerb_ticket=None,
driver='libhdfs'):
self._connect(host, port, user, kerb_ticket, driver)
def __init__(self, fs):
self.fs = fs

@implements(Filesystem.isdir)
def isdir(self, path):
return lib._HdfsClient.isdir(self, path)
raise NotImplementedError("Unsupported file system API")

@implements(Filesystem.isfile)
def isfile(self, path):
return lib._HdfsClient.isfile(self, path)
raise NotImplementedError("Unsupported file system API")

@implements(Filesystem.delete)
def delete(self, path, recursive=False):
return lib._HdfsClient.delete(self, path, recursive)
return self.fs.rm(path, recursive=recursive)

@implements(Filesystem.mkdir)
def mkdir(self, path, create_parents=True):
return lib._HdfsClient.mkdir(self, path)
def mkdir(self, path):
return self.fs.mkdir(path)

def ls(self, path, full_info=False):
@implements(Filesystem.open)
def open(self, path, mode='rb'):
"""
Open file for reading or writing
"""
Retrieve directory contents and metadata, if requested.
return self.fs.open(path, mode=mode)

Parameters
----------
path : HDFS path
full_info : boolean, default False
If False, only return list of paths
def ls(self, path, detail=False):
return self.fs.ls(path, detail=detail)

Returns
-------
result : list of dicts (full_info=True) or strings (full_info=False)
def walk(self, top_path):
"""
Directory tree generator, like os.walk
"""
return self.fs.walk(top_path)


class S3FSWrapper(DaskFilesystem):

@implements(Filesystem.isdir)
def isdir(self, path):
try:
contents = self.fs.ls(path)
if len(contents) == 1 and contents[0] == path:
return False
else:
return True
except OSError:
return False

@implements(Filesystem.isfile)
def isfile(self, path):
try:
contents = self.fs.ls(path)
return len(contents) == 1 and contents[0] == path
except OSError:
return False

def walk(self, path, refresh=False):
"""
Directory tree generator, like os.walk

Generator version of what is in s3fs, which yields a flattened list of
files
"""
return lib._HdfsClient.ls(self, path, full_info)
path = path.replace('s3://', '')
directories = set()
files = set()

for key in list(self.fs._ls(path, refresh=refresh)):
path = key['Key']
if key['StorageClass'] == 'DIRECTORY':
directories.add(path)
elif key['StorageClass'] == 'BUCKET':
pass
else:
files.add(path)

# s3fs creates duplicate 'DIRECTORY' entries
files = sorted([posixpath.split(f)[1] for f in files
if f not in directories])
directories = sorted([posixpath.split(x)[1]
for x in directories])

yield path, directories, files

for directory in directories:
for tup in self.walk(directory, refresh=refresh):
yield tup
Loading