Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compatibility with the new Arrow FileSystem implementations #295

Closed
jorisvandenbossche opened this issue May 12, 2020 · 28 comments
Closed

Comments

@jorisvandenbossche
Copy link
Contributor

Background

@martindurant as you know, last year we started developing new FileSystem implementations in the Apache Arrow project (https://issues.apache.org/jira/browse/ARROW-767, apache/arrow#4225 is the PR with the initial abstract API, on which you gave feedback). Those developments have some implications for users using fsspec-compatible filesystems, and so as promised, with some delay, opening an issue here to discuss how to handle those implications (and since fsspec currently holds the pyarrow-compatibiliy layer, opening an issue here seems appropriate).

To summarize:

  • The "old" filesystems are available under pyarrow.filesystems (docs). We basically only have a LocalFileSystem and pa.hdfs.HadoopFileSystem as concrete implementations. And in addition, there is the DaskFileSystem which is used by fsspec as base class, see more on that below.
  • The "new" filesystems are available in the pyarrow.fs submodule (docs). Those are python wrappers for the C++ implementations, and currently there are already concrete implementations for local, Hadoop and S3.

So an important difference is that the new filesystems are actual implementations in C++, and pyarrow.fs is only providing wrappers for those. This is done for good reasons: those filesystem are a shared implementation and are used by many different users of the Arrow project (and from C, C++, Python, R, Ruby, ..). Further, those filesystems are for example used in the Arrow Datasets project, which enables a bunch of new features in the ParquetDataset reading (and also enabled that you can now actually query a Parquet dataset from R). Those new filesystems have been an important part in moving the Arrow project forward.

But this also means that the filesystem that pyarrow functions expect is no longer an "interface" you can implement, but it actually needs a filesystem that wraps a C++ filesystem.
(to be clear: all functionality that already existed before is right now still accepting the old filesystems, only the new pyarrow.dataset module already requires the new filesystems. But long term, we want to move consistently to the new filesystems).

Concretely, this means that the feature of fsspec to automatically provide compatibility with payrrow will no longer work in the future:

if installed, all file-system classes also subclass from pyarrow.filesystem.FileSystem, so can work with any arrow function expecting such an instance

This current compatibility means that eg pyarrow's parquet.read_table/ParquetDataset work with any fsspec filesystem.


Concrete issues

Ideally, we want to keep compatibility for the existing user base that is using fsspec-based filesystems with pyarrow functionality, while at the same time internally in pyarrow moving completely to our new filesytem implementation.
To achieve this, I currently see two (not necessarily mutually exclusive, to be clear) options:

  • Implement a "conversion" for all important fsspec-based filesystems to a pyarrow.fs filesystem object (eg convert a s3fs.S3FileSystem instance with all its configuration into an equivalent pyarrow.fs.S3FileSystem).
    • I suppose this is what we will do for a LocalFileSystem. But for other filesystems, I don't know how faithful such conversions always can be (eg this might be tricky for things like S3? Can an S3 filesytem be fully encoded/roundtripped in an URI?)
    • This option of course has the pre-condition that we actually support the filesystem in question in pyarrow (which is currently limited, although we plan to expand this).
  • Implement a "pyarrow.fs wrapper for fsspec", a C++ FileSystem that calls back into a python object for each of its methods (where this python object then could be any fsspec-compatible filesystem).
    • Such a "PythonCallBackFilesystem" would allow that pyarrow can actually use the fsspec-based filesystems without converting them. It would provide easy compatibility (easy for the user, to be clear ;)), at the cost of performance (compared to pyarrow's native filesystems)
    • We could wrap incoming fsspec-filesystems in pyarrow, or fsspec could use such a class as baseclass when pyarrow is installed similarly as is done now.
    • I didn't yet investigate the feasibility of this option, but opened ARROW-8766 for it.

There is actually also a third option, and that is that some concrete fsspec implementations start to use one of the new pyarrow filesystems as its base, and then it would also be directly usable in pyarrow (but that's not my call to make, but up to those individual projects to be clear. For HDFS in fsspec that's probably what we want to do, though, since its implementation already depends on pyarrow).

As mentioned above, those options are not necessarily mutually exclusive. It might depend on the specific filesystem which option is desirable / possible (and the second option could also be a fallback for the first option if pyarrow doesn't support the specific file system).

Thoughts on this? About the feasibility of the specific options? Other options?


Note: the above is about the use case of "users can pass an fsspec filesystem to pyarrow". There is also the use case the other way around of "using pyarrow filesystems where fsspec-compliant filesystems are expected". For this, an fsspec-compliant wrapper around a pyarrow.fs filesystem is probably useful (and I suppose this is something that could live in pyarrow). For this there is https://issues.apache.org/jira/browse/ARROW-7102
Such a wrapper could also provide a richer API for those users who want that with the pyarrow filesystems (since the current pyarrow.fs filesystems are rather bare-bones)

cc @martindurant @TomAugspurger @pitrou

@TomAugspurger
Copy link
Collaborator

Just to make sure I understand the goals / issues: we'd like Arrow's new datasets API to work well with fsspec filesystems so that users and libraries like Dask can read data from s3 / gcs / azure blob storage using the new datasets API.

And the primary issue is that the new datasets API is interacting with the filesystem in C++, so we really do need a C++ filesystem thing.

Naively, I'd expect something like https://issues.apache.org/jira/browse/ARROW-7102 to be the best solution. Then anyone can come along with an fsspec-based filesystem and pyarrow will have (a hopefully thin) wrapper around it to get the pieces of data it needs about the filesystem.

I haven't looked closely at the new filesystem, but is it doing filesystem operations like listing files from C++? In that case, ARROW-7102 and https://issues.apache.org/jira/browse/ARROW-8766 start to sound very similar right? The C++ filesystem will need to list the files in a directory, and so it calls back to python, which calls to the underlying fsspec-based implementation? Initially, I'm not too worried about performance here. At least for cloud storage the overhead of network calls tends to dominate.

you describe in https://issues.apache.org/jira/browse/ARROW-8766 to work well. And at least for things like s3fs, gcsfs, and adlfs I don't expect performance to be an issue. The network overhead tends to dominate most things.

@jorisvandenbossche
Copy link
Contributor Author

In that case, ARROW-7102 and ARROW-8766 start to sound very similar right? The C++ filesystem will need to list the files in a directory, and so it calls back to python, which calls to the underlying fsspec-based implementation?

Yeah, maybe ARROW-7102 and ARROW-8766 are basically somewhat duplicative. But the ARROW-7102 issue was not very clearly specifying the actual idea of what you describe above (a C++ filesystem calling back to python to ask the fsspec-based implementation), so for this I opened ARROW-8766 to track this idea more specifically.

So it might be you were thinking of that when you opened ARROW-7102, but now with the new filesystems, it would not be something simple as a FSSpecWrapper python class, as it will require an actual C++ class calling into python (so it cannot longer be done like the existing pyarrow.filesystems.S3FSWrapper class you mentioned in that issue).

To quote @pitrou from ARROW-7102:

Going forward, two things could be useful:

  • Make a fsspec wrapper for pyarrow.fs
  • Make a pyarrow.fs wrapper for fsspec

It's the second point for which I opened ARROW-8766 , but the first point is also useful, and it was for this point that I referenced ARROW-7102 at the end of the top post (when talking about providing a fsspec compatible class that wraps a pyarrow.fs filesystem), but indeed the issue was not specifically about that. Sorry for the confusion here :)

To try to clear up this situation, I opened ARROW-8780 specifically for the wrapping of pyarrow.fs in a fsspec-compliant object, made ARROW-8766 and ARROW-8780 as two subtasks of ARROW-7102, and updated the body of that issue to clarify this (and have ARROW-7102 as the general "fsspec compatibilty" issue on the Arrow side).

@jorisvandenbossche
Copy link
Contributor Author

Just to make sure I understand the goals / issues: we'd like Arrow's new datasets API to work well with fsspec filesystems so that users and libraries like Dask can read data from s3 / gcs / azure blob storage using the new datasets API.

Yes, right now it's indeed specifically for the Datasets API (but long term we want to use those new filesystems everywhere in pyarrow that needs filesystem-interaction, and actually deprecate/remove the "old" pure python filesytems).

So practically speaking: if Dask would adopt the Datasets API for Parquet IO (-> dask/dask#6174), then Dask needs to pass a pyarrow.fs filesystem to the pyarrow.dataset-calls. But Dask currently uses fsspec internally to convert the path/storage_options the user specifies into a filesytem/path combo, which thus won't work out of the box with the Datasets API.
For example, from https://docs.dask.org/en/latest/remote-data-services.html, dask users can do:

import dask.dataframe as dd
df = dd.read_csv('s3://bucket/path/to/data-*.csv', storage_options={"key": ..})
df = dd.read_parquet('gcs://bucket/path/to/data-*.parq')

So here the user can only specify the path (and optional storage_options), and not pass an actual fsspec filesystem object (I thought this was also the case, but apparently mistaken here, this actually makes it a bit simpler).
So in theory, in the "ArrowDatasetEngine" version of read_parquet, Dask could process this string path differently, and instead of creating an fsspec filesystem, it could create a pyarrow.fs filesystem (there is a pyarrow.fs.FileSystem.from_uri method, also dealing with storage_options might require some extra helpers on the pyarrow side).
However, doing it like this has two downsides:

  • This only works for filesystems actually supported by pyarrow right now (local, hdfs, s3; so not yet gcs or adl). Initially, this might not necessarily be a problem if the "ArrowDatasetEngine" is opt-in, but eventually this can only work if pyarrow supports all relevant filesystems.
  • This adds custom filesystem handling to dask specifically for the pyarrow-based read_parquet, while the rest of dask uses fsspec for this. So I would understand that Dask maintainers would be hesitant to do this.

The above is for Dask's use case, but also pandas is using fsspec-compatible file systems (pandas already uses s3fs, and might be switching to use fsspec more generally in the near future, xref pandas-dev/pandas#33549).

And in pyarrow itself, we also currently accept fsspec filesystems in certain cases (and here as actual object, not parsed from a URI). For example, the top google search for reading parquet from s3 with python gives basically this solution:

import pyarrow.parquet as pq
import s3fs

fs = s3fs.S3FileSystem()
dataset = pq.ParquetDataset("s3://my_bucker/path/to/data_folder/", filesystem=fs)

This works right now because s3fs subclasses from fsspec, which ensures all its subclasses are also pyarrow.filesystem.FileSystem subclasses. But won't work anymore when we remove the old filesystems in pyarrow (now you could certainly say that this isn't fsspec's problem .. ;))

@TomAugspurger
Copy link
Collaborator

Thanks. I'd personally consider

This only works for filesystems actually supported by pyarrow right now

to be a blocker to changing the dd.read_parquet(..., engine="arrow") to use the new datasets API.

From my point of view it seems natural for Dask / others to continue to pass an fsspec-based filesystem to Arrow and for it to transparently convert that to an Arrow filesystem for your internal calls. But I'm not the one who'd be implementing it, so take that with a grain of salt :)

@jorisvandenbossche
Copy link
Contributor Author

jorisvandenbossche commented Jun 10, 2020

In the mean time, we have made progress on the "pyarrow.fs filesytem wrapping an fsspec-filesytem" option. Antoine implemented the low-level callback details, and I have now a PR that creates a "filesystem handler" based on an fsspec-compatible filesytem: apache/arrow#7395

Feedback is very welcome, and especially if someone could test it with an actual s3 or azure filesystem would be interesting (I currently only tested it with a local and in-memory filesystem, and a minio server)


Now, that is for wrapping an fsspec filesystem. But I would also still be interested to get thoughts on actually converting an fsspec filesystem to a naive pyarrow filesystem (for the ones we support). Eg, would that be possible to do this robustly / faithfully for eg S3? Can all required information be encoded in an URI, for example, which could then be parsed by pyarrow? (that might be hard with all different ways to configure/store credentials?)
(cc @martindurant)

@martindurant
Copy link
Member

Quick reply: minio is a good alternative to a "real" S3, I would trust it for testing; even moto is very nearly complete.

The rest will take more thinking, but I doubt you could encode all the options for S3 in a URL. s3fs does support including the key/secret in the URL, but nothing else, and this is rarely used given the many other ways to provide credentials.

@fhoering
Copy link

@martindurant @jorisvandenbossche
Many thanks for starting the effort on converging the new pyarrow fs implementation and fsspec.
AFAIU You now plan to wrap the fsspec fs into the new arrow fs to be able to directly use the arrow dataset api in Dask.

What about the other way round ? Wrapping the new arrow fs into fs spec ?

I'm asking this because I'm (still) interested in having a generic, easy to use, FS which supports both HDFS & S3. What is currently existing with S3FsWrapper in pyarrow and the old pyarrow filesystem is nearly what I need.

I also like the fsspec idea and I think the fsspec api is much easier to use than the new pyarrow fs api.
But from what I understand this HDFS wrapper will be removed/broken soon: https://github.com/intake/filesystem_spec/blob/master/fsspec/registry.py#L124

What are your thoughts about this ?

@jorisvandenbossche
Copy link
Contributor Author

What about the other way round ? Wrapping the new arrow fs into fs spec ?

See the final note of the top post ("the above is about the use case of "users can pass an fsspec filesystem to pyarrow". There is also the use case the other way around of "using pyarrow filesystems where fsspec-compliant filesystems are expected" ... ").
So yes, that's also an interesting use case, but this issue is primarily about ensuring fsspec filesystems can continue being used with pyarrow.
For the other way around, we have https://issues.apache.org/jira/browse/ARROW-7102 and https://issues.apache.org/jira/browse/ARROW-8780 on the pyarrow side (we could maybe open a new issue here as well). And contributions in this area are very welcome. One thing to decide is where this could live (eg in fsspec or in pyarrow).

@fhoering
Copy link

fhoering commented Jun 24, 2020

OK. Thanks. I will watch those tickets.

It would be nice if this could follow the pattern that fsspec really only provides the interface and some helpers maybe (to avoid pulling in unnecessary dependencies in client projects) and then each project that provides the functionality pulls in fsspec and implements it.

So to me this would mean arrow provides the S3/HDFS implementation, pulls in fsspec and then the client project pulls fsspec for the interfaces (+ maybe some helpers) + pyarrow + other libraries that provide other interesting fs implementations.

@martindurant
Copy link
Member

It would be nice if this could follow the pattern that fsspec really only provides the interface and some helpers maybe (to avoid pulling in unnecessary dependencies in client projects) and then each project that provides the functionality pulls in fsspec and implements it.

That is exactly what I had in mind when writing fsspec :)
Note that we already have an independent S3 implementation too (s3fs), but our HDFS implementation uses arrow's one (I think this was mentioned). fsspec currently lacks a way to register implementations without importing them or updating fsspec.registry, but we ought to move to entrypoints, so that you could import whichever S3 library you have installed, in the case that there are multiple compatible ones.

@fhoering
Copy link

Yes, currently I'm actually using S3FS + HDFS via arrow. It works pretty well. Thanks.
But I understood the goal was to move to arrows S3 implementation.

@martindurant
Copy link
Member

But I understood the goal was to move to arrows S3 implementation.

Maybe arrow wants to do this, but s3fs will remain as a supported project with a large user base. I understand that arrow would like to implement in C++ to provide cross-language support, but the pydata ecosystem appreciates a simple pure-python package.
(I maintain fsspec and s3fs, and don't speak for arrow)

@ldacey
Copy link

ldacey commented Aug 2, 2020

It seems like the pyarrow.dataset feature at least partially supports fsspec and Azure Blob.

After many guess and check testing attempts, I finally got this to work (using fsspec)

import pyarrow.dataset as ds
import fsspec

fs = fsspec.filesystem(protocol='abfs', 
                       account_name=login, 
                       account_key=password)

dataset = ds.dataset("abfs://analytics/test/test.parquet", format="parquet", filesystem=fs)
dataset.to_table(columns=['id', 'event_value'], filter=ds.field('event_value') == 'closed').to_pandas()

But if I try to read a partitioned dataset instead of a single file then I run into this error:

~/.local/lib/python3.7/site-packages/pyarrow/fs.py in get_file_info_selector(self, selector)
    159         infos = []
    160         selected_files = self.fs.find(
--> 161             selector.base_dir, maxdepth=maxdepth, withdirs=True, detail=True
    162         )
    163         for path, info in selected_files.items():

/opt/conda/lib/python3.7/site-packages/fsspec/spec.py in find(self, path, maxdepth, withdirs, **kwargs)
    369         # TODO: allow equivalent of -name parameter
    370         out = set()
--> 371         for path, dirs, files in self.walk(path, maxdepth, **kwargs):
    372             if withdirs:
    373                 files += dirs

/opt/conda/lib/python3.7/site-packages/fsspec/spec.py in walk(self, path, maxdepth, **kwargs)
    324 
    325         try:
--> 326             listing = self.ls(path, detail=True, **kwargs)
    327         except (FileNotFoundError, IOError):
    328             return [], [], []

I couldn't find any documentation for Azure Blob for this new dataset feature but pq.ParquetDataset works fine, Dask and pandas works, and the DaskFileSystem seems to connect fine (I can list directories and all of that)

file_sys = pyarrow.filesystem.DaskFileSystem(fs)
file_sys.ls(path='analytics/test/')

I just wonder if I am doing something wrong - it seems like I am so close right now and I want to experiment with non-Hive partitioning and the row filtering.

@pitrou
Copy link

pitrou commented Aug 2, 2020

@ldacey Can you report a bug to Arrow? See https://arrow.apache.org/docs/developers/contributing.html#report-bugs-and-propose-features for how to do this.

@ldacey
Copy link

ldacey commented Aug 2, 2020

@pitrou I edited the original issue I made about this subject when I thought that the dataset API would not work at all with Azure Blob. Now I know that it does work (a bit), but perhaps there is some conflict with fsspec and pyarrow filesystem since they both have detail=True arguments.

TypeError: ls() got multiple values for keyword argument 'detail'
https://issues.apache.org/jira/browse/ARROW-9514

@ldacey
Copy link

ldacey commented Aug 3, 2020

FYI - I reverted to 0.17.1 to test this again.

read_table works again (on version 1.0.0 pq.read_table was not working - the same error causes it to fail: TypeError: ls() got multiple values for keyword argument 'detail')

Reading a single parquet file no longer works with the pyarrow.dataset. This does work on 1.0.0:

dataset = ds.dataset("abfs://analytics/test/test.parquet", format="parquet", filesystem=fs)

TypeError: filesystem argument must be a FileSystem instance or a valid file system URI

Dask seems to work either way.

@martindurant
Copy link
Member

@jorisvandenbossche , are you still following this?

@jorisvandenbossche
Copy link
Contributor Author

@martindurant yes, is there anything specific to look at / answer to? (the recent comments regarding Azure are further discussed in https://issues.apache.org/jira/browse/ARROW-9514 I think?)

@martindurant
Copy link
Member

Indeed - and it seems that was actually all fine. I think this can be closed.

@jorisvandenbossche
Copy link
Contributor Author

One additional aspect (but can also open a new issue for that), is that fsspec will need to update its hdfs filesystem implementation at some point (as right now it wraps the pa.hdfs.HadoopFileSystem, which will be deprecated in favor of pa.fs.HadoopFileSystem). This relates to implementing a fsspec-compatible wrapper for the new arrow filesystems (so a wrapper "the other way around"), as such a general wrapper could also be the base for the hdfs wrapper.

@martindurant
Copy link
Member

What is the deprecation timeline? I suggest a new issue when warnings start showing up.

@jorisvandenbossche
Copy link
Contributor Author

jorisvandenbossche commented Sep 11, 2020

What is the deprecation timeline? I suggest a new issue when warnings start showing up.

The timeline is that it will be actually deprecated (with warning) in pyarrow 2.0, to be released in a couple of weeks.
I would suggest for fsspec to actually stop using it before we start raising a warning.

@martindurant
Copy link
Member

So what is the status of the new implementation, can it be used with/from fsspec?
I don't think there's a problem leaving the warning there, this will only show up for cases that are explicitly calling HDFS via fsspec.

@fhoering
Copy link

fhoering commented Sep 11, 2020

Maybe you should replace it by https://github.com/dask/hdfs3. I was using this before and it was working great.
(I actually migrated to pyarrow becasue of the deprecation warning)

@martindurant
Copy link
Member

hdfs3 used to be the way to do this (indeed, it was the very first of the fsspec-like implementations). However libhdfs3 (the C++ library it depends on) has proven difficult to maintain, and incomplete for secured connections, which I was not able to solve.

@fhoering
Copy link

fhoering commented Sep 11, 2020

incomplete for secured connections

We used it with kerberos authentification on our cluster if you mean that. Worked pretty well. The only thing that didn't work was viewfs. So we implemented on top on our own.

But anyway nevermind the comment. I was a user of hdfs3 and really liked it, then I moved the old pyarrow fs. It is not that bad, I see the limitations now that I use it. But imo it was all fixable.
The new pyarrow FS in Python is really cumbersome from my point of view. That's why I would like to see the fsspec wrapper as a public API. I would even implement it but I don't have much free time for this at the moment.

@martindurant
Copy link
Member

We used it with kerberos authentification on our cluster if you mean that.

I believe it was the line-encryption, not the auth, but old history now...

Let's see what @jorisvandenbossche says about the new implementation. It would be great if it could simply be registered with fsspec and work. Then the shim we have here can indeed be dropped.

@jorisvandenbossche
Copy link
Contributor Author

So what is the status of the new implementation, can it be used with/from fsspec?

As already explained multiple times in this thread: the pyarrow.fs filesystems have a different API not compatible with fsspec (so no, it cannot be used directly with/from fsspec), but ideally someone would write a wrapper to wrap a pyarrow.fs filesystem in an fsspec-compatible object (for which I opened https://issues.apache.org/jira/browse/ARROW-8780, but it's something that could potentially also live in fsspec I think).

I don't think there's a problem leaving the warning there, this will only show up for cases that are explicitly calling HDFS via fsspec.

It's not only about HDFS, I think? Also the other filesystems inherit from pyarrow.filesystem.FileSystem if pyarrow is installed?

hdfs3 used to be the way to do this (indeed, it was the very first of the fsspec-like implementations). However libhdfs3 (the C++ library it depends on) has proven difficult to maintain, and incomplete for secured connections, which I was not able to solve.

And libhdfs3 is also no longer being maintained, AFAIK. Also pyarrow dropped the optional libhdfs3 driver support and the new pyarrow.fs.HadoopFileSystem only supports the JNI driver.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants