Skip to content

Support using pyarrow for hdfs#3123

Merged
jcrist merged 16 commits intodask:masterfrom
jcrist:pyarrow-hdfs__TEST_HDFS__
Feb 1, 2018
Merged

Support using pyarrow for hdfs#3123
jcrist merged 16 commits intodask:masterfrom
jcrist:pyarrow-hdfs__TEST_HDFS__

Conversation

@jcrist
Copy link
Copy Markdown
Member

@jcrist jcrist commented Jan 31, 2018

Adds support for using pyarrow instead of hdfs3 for hdfs integration. By default the first installed library in [hdfs3, pyarrow] is used. To explicitly set which driver to use, users can set hdfs_driver with dask.set_options:

# Use pyarrow for hdfs integration
with dask.set_options(hdfs_driver='pyarrow'):
    df = dd.read_csv('hdfs:///path/to/*.csv')

Since a user is unlikely to want to use both at the same time, this seemed like the cleanest way to configure.

Summary of changes:

  • Update dask.bytes to support both hdfs driver options
  • Add a hdfs filesystem using pyarrow. Note that this requires pyarrow development version due to several bugs in the latest release.
  • Update the tests to work with both/either driver installed.
  • Add a glob implementation for the pyarrow implementation. This was copied (and heavily modified) from the version in the standard library. As such I've copied over the relevant license, mirroring how this was done in distributed/threadpoolexecutor.py.
  • Update the test infrastructure to support libhdfs and libhdfs3 concurrently.
  • Update the relevant documentation.

Fixes #3046.
Fixes #1880.

@jcrist
Copy link
Copy Markdown
Member Author

jcrist commented Jan 31, 2018

cc @mrocklin, @martindurant for review.

PYARROW_DRIVER = LooseVersion(pyarrow.__version__) >= _MIN_PYARROW_VERSION_SUPPORTED
except ImportError:
PYARROW_DRIVER = False
pyarrow = None
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this approach

hdfs.rm(basedir, recursive=True)
hdfs.mkdir(basedir)

yield HDFSDriver(hdfs, request.param)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe instead do the following?

with dask.set_options(hdfs_driver=request.param):
    yield hdfs

This would centralize the dask.set_options code and remove the need for the HDFSDriver class.

(I may be missing something though)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally agree that it would be nice to have this as accurately match usage as possible.
Would it work to replace the driver.fs.open calls below with open_files? Then each test can run in one context.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went with Matt's approach, this seemed cleaner and we already test that the set_options bit results in the correct driver.

@mrocklin
Copy link
Copy Markdown
Member

mrocklin commented Feb 1, 2018

cc @wesm in case he's interested

@@ -1,2 +1,2 @@
docker exec -it $CONTAINER_ID conda install -y -q dask hdfs3 pyarrow -c conda-forge
docker exec -it $CONTAINER_ID conda install -y -q dask hdfs3 pyarrow -c twosigma -c conda-forge
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does twosigma need to be ordered first here? I guess this will go away with time.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. We want pyarrow from the nightly builds (for now). Conda channels are prioritized in the order given.

import pyarrow as pa


class HDFS3Wrapper(pa.filesystem.DaskFileSystem):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for arrow's libhdfs3?
I know this is just moved from above, but it's not obvious to me what it's doing, while the class below seem to have all the required behaviour.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for wrapping hdfs3's filesystem to be used inside pyarrow. I added a comment that should clarify this.

return sorted(_glob(self.fs, path))

def mkdirs(self, path):
return self.fs.mkdir(path)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will fail if the parent directory does not exist, I think - unlike the usual understanding of mkdirs.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

libhdfs always makes intermediate directories. I added an (ignored) keyword that arrow supports that should better document the intention here.

# Copyright 2001-2018 Python Software Foundation; All Rights Reserved


def _glob(fs, pathname):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it's useful to export glob for other file-systems?
(I know there are issues on the glob implementation elsewhere)

hdfs.rm(basedir, recursive=True)
hdfs.mkdir(basedir)

yield HDFSDriver(hdfs, request.param)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally agree that it would be nice to have this as accurately match usage as possible.
Would it work to replace the driver.fs.open calls below with open_files? Then each test can run in one context.

assert len(ddf2) == 1000 # smoke test on read


def test_pyarrow_glob(pa_hdfs):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here you show how the globs are different, but we would like them to be the same. Perhaps a test across engines showing that at least simpler globs are the same?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved the glob implementation out into its own file, and applied it to hdfs3 as well (which had inconsistent behavior). Modified the test to test both drivers.

.. _s3fs: http://s3fs.readthedocs.io/
.. .. _azure-data-lake-store-python: https://github.com/Azure/azure-data-lake-store-python
.. _gcsfs: https://github.com/martindurant/gcsfs/
.. _gcsfs: https://github.com/dask/gcsfs/
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch

- ``ticket_cache``, ``token``: kerberos authentication
- ``pars``: dictionary of further parameters (e.g., for `high availability`_)

The ``hdfs3`` driver also relies on a few environment variables. For
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hdfs3 driver configuration can also be affected by a few environment variables.

.. _gcsfs: https://github.com/martindurant/gcsfs/
.. _gcloud: https://cloud.google.com/sdk/docs/

At the time of writing, ``gcsfs.GCSFileSystem`` instances pickle including the auth token, so sensitive
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This paragraph and the next are outdated and can be removed.

#
# These functions are under copyright by the Python Software Foundation
#
# Copyright 2001-2018 Python Software Foundation; All Rights Reserved
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear to me if this is needed (added it just to be safe). The functions below started from a copy-paste from the standard library, but they've been simplified (remove behavior/options we don't need) and heavily modified (support generic filesystems, better function names, cleaner implementation logic, remove duplicate branches, ...) such that they're probably unrecognizable compared to the original. I suppose this still is a derivative work though.

@jcrist
Copy link
Copy Markdown
Member Author

jcrist commented Feb 1, 2018

Thanks for the review, I believe all comments have been addressed.

@martindurant
Copy link
Copy Markdown
Member

On a quick glance, this looks OK.
You reworked the test fixture because of glob? Well, it probably looks simpler now.

Copy link
Copy Markdown
Member

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To the extend that I am able to judge, this seems fine.

I left a few small comments.

echo 'deb-src http://archive.cloudera.com/cdh5/ubuntu/xenial/amd64/cdh xenial-cdh5 contrib' >> /etc/apt/sources.list.d/cloudera.list && \
apt-get update && \
apt-get install -y -q openjdk-7-jre-headless hadoop-conf-pseudo && \
apt-get install -y -q sudo openjdk-8-jre-headless hadoop-conf-pseudo libhdfs0 && \
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is sudo listed here?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new ubuntu docker base image doesn't provide sudo by default anymore, while it is needed (afaict) to setup hdfs on docker.

def from_hdfs3(cls, fs):
out = object.__new__(cls)
out.fs = fs
return out
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we have this?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Testing purposes. Wrapping an existing hdfs client in the dask wrapper, rather than creating one from scratch.

return self.fs.mkdir(path, create_parents=True)

def ukey(self, path):
return tokenize(path, self.fs.info(path)['last_modified'])
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just checking, but does HDFS already offer content hashing? It may.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

$ hdfs dfs -checksum /project1/file.txt
0000020000000000000000003e50be59553b2ddaf401c575f8df6914

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Afaict it does not. The checksums seem to be for robustness checking (you can turn on/off whether checksums are verified on read), but I don't think it exposes them.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, then I retract the comment

f.write('a b\nc d'.encode())

b = db.read_text('hdfs://%s/text.*.txt' % basedir)
result = b.str.strip().str.split().map(len).compute(get=dask.get)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There have been multiprocessing issues with hdfs3 in the past. It might be wise to leave these with the multiprocessing scheduler.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Looks like there's fork-safety issues with libhdfs (or pyarrow's wrapper of it, not sure). Will file an issue. Testing using the spawn context works fine though, and mimics how things would work with the distributed scheduler.

jcrist added 15 commits February 1, 2018 16:55
- Only support recent Pyarrow version with patches pushed upstream
- Add tests for glob
- Add psf license for glob functionality
Move the glob code out of pyarrow module, and apply it to hdfs3 driver
as well (due to inconsistent behavior between hdfs3 and other glob
implementations). Test that hdfs3 and pyarrow glob matches.
@jcrist jcrist force-pushed the pyarrow-hdfs__TEST_HDFS__ branch from 0780795 to 9f10da8 Compare February 1, 2018 22:58
@mrocklin
Copy link
Copy Markdown
Member

mrocklin commented Feb 1, 2018 via email

@jcrist
Copy link
Copy Markdown
Member Author

jcrist commented Feb 1, 2018

You might consider doing a test with Client() to simulate normal distributed operation.

Done.

@jcrist
Copy link
Copy Markdown
Member Author

jcrist commented Feb 1, 2018

Thanks for the review all. Merging.

@jcrist jcrist merged commit ae5e1d5 into dask:master Feb 1, 2018
@jcrist jcrist deleted the pyarrow-hdfs__TEST_HDFS__ branch February 1, 2018 23:59
@wesm
Copy link
Copy Markdown
Contributor

wesm commented Feb 9, 2018

Looking at this late, but thank you for doing this! Some users will appreciate being able to use libhdfs -- it might bear mentioning in the release notes that this allows the official HDFS Java client libraries to be used.

I didn't dig in far enough -- if we used the Arrow Parquet support, would the underlying hdfs client handle (faster) be passed down to pyarrow.parquet.read_parquet or would a wrapper object be passed (slower)?

@jcrist
Copy link
Copy Markdown
Member Author

jcrist commented Feb 9, 2018

Some users will appreciate being able to use libhdfs

Heh, I was one of those users. libhdfs3 doesn't support at-rest-encyption, so we needed support for libhdfs.

I didn't dig in far enough -- if we used the Arrow Parquet support, would the underlying hdfs client handle (faster) be passed down to pyarrow.parquet.read_parquet or would a wrapper object be passed (slower)?

Correct. The hdfs3 driver will use a wrapper, but the pyarrow driver will pass the pyarrow hdfs filesystem directly.

@alex959595
Copy link
Copy Markdown

alex959595 commented Mar 2, 2018

When I run the code mentioned above,

with dask.set_options(hdfs_driver='pyarrow'):
    df = dd.read_csv('hdfs:///path/to/*.csv')

I get this error

RuntimeError: pyarrow version >= '0.8.1.dev81' required for hdfs driver support

The latest release of pyarrow that I was able to find was pyarrow 0.8.0

I was wondering if someone could point me in the right direction of how to get later versions.
Thanks

@jcrist @mrocklin

@wesm
Copy link
Copy Markdown
Contributor

wesm commented Mar 2, 2018

You can install nightlies on Linux with conda install pyarrow -c twosigma. The 0.9.0 release will hopefully be out by mid-March

@alex959595
Copy link
Copy Markdown

Thanks for the quick response, try to install running into an error on the import.

from pyarrow.lib import cpu_count, set_cpu_count
ImportError: /home/awatson/anaconda3/lib/python3.6/site-packages/pyarrow/lib.cpython-36m-x86_64-linux-gnu.so: undefined symbol: _ZN5arrow2py17ConvertPySequenceEP7_objectRKSt10shared_ptrINS_8DataTypeEEPNS_10MemoryPoolEPS3_INS_5ArrayEE

@wesm
Copy link
Copy Markdown
Contributor

wesm commented Mar 2, 2018

@alex959595 can you show the installed arrow-cpp, parquet-cpp, and pyarrow versions? It looks like there's a problem with the nightlies. cc @cpcloud

@alex959595
Copy link
Copy Markdown

@wesm arrow-cpp 0.8.0, parquet 1.4.0.pre, pyarrow 0.8.0+151.nightly

@wesm
Copy link
Copy Markdown
Contributor

wesm commented Mar 5, 2018

OK, from the look of https://anaconda.org/twosigma/pyarrow/files the nightly version numbers are messed up after the 0.3.0 JS release tag went out -- @cpcloud can you have a look at what's wrong? Must be related to the issue fixed in apache/arrow@55bdae5

@wesm
Copy link
Copy Markdown
Contributor

wesm commented Mar 5, 2018

I think @cpcloud is unavailable today and tomorrow so this may have to wait to get fixed until later in the week

@alex959595
Copy link
Copy Markdown

@wesm Thanks for your quick responses, and sounds good!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants