Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for reading remote storage systems #811

Closed
wants to merge 17 commits into from

Conversation

yjshen
Copy link
Member

@yjshen yjshen commented Aug 2, 2021

Which issue does this PR close?

Closes #616

Rationale for this change

Currently, we can only read files from LocalFS since we use std::fs in ParquetExec. It would be nice to add support to read files that reside on storage sources such as HDFS, Amazon S3, etc.

What changes are included in this PR?

Introduce ObjectStore API as an abstraction of the underlying storage systems, such as local filesystem, HDFS, S3, etc. And make the ObjectStore implementation pluggable through the ObjectStoreRegistery in Datafusion's ExecutionContext.

Are there any user-facing changes?

Users can provide implementations for the ObjectStore trait, register it into ExecutionContext's registry, and run queries against data that resides in remote storage systems as well as local fs (the only default implementation of ObjectStore).

@github-actions github-actions bot added the datafusion Changes in the datafusion crate label Aug 2, 2021
@houqp houqp added the enhancement New feature or request label Aug 2, 2021
@houqp
Copy link
Member

houqp commented Aug 2, 2021

Thank you @yjshen , this is huge! I will help review it tomorrow.

@yjshen
Copy link
Member Author

yjshen commented Aug 2, 2021

Thank you @yjshen , this is huge! I will help review it tomorrow.

Thanks, @houqp. Currently, it's just a draft and not ready for a full review. I made it out early to get your idea to see if it's on the right track.

Copy link
Member

@houqp houqp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like there is some duplication on file content reading and listing responsibilities between DataSource, SourceDescBuilder and ProtocolHandler traits. Would be good to give more thoughts on how these abstractions would interact with each other.

datafusion/src/datasource/protocol_registry.rs Outdated Show resolved Hide resolved
datafusion/src/datasource/datasource2.rs Outdated Show resolved Hide resolved
datafusion/src/datasource/datasource2.rs Outdated Show resolved Hide resolved
datafusion/src/execution/context.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking quite cool @yjshen -- thank you! I left some comments of things that might be worth considering.

Also I wonder if you have thought about trying to make this interface async somehow? I realize the underlying parquet reader isn't async (yet) but I think that is the direction things are heading in Rust I/O land

datafusion/src/datasource/protocol_registry.rs Outdated Show resolved Hide resolved
datafusion/src/datasource/protocol_registry.rs Outdated Show resolved Hide resolved
datafusion/src/datasource/protocol_registry.rs Outdated Show resolved Hide resolved
datafusion/src/execution/context.rs Outdated Show resolved Hide resolved
datafusion/src/datasource/protocol_registry.rs Outdated Show resolved Hide resolved
datafusion/src/datasource/protocol_registry.rs Outdated Show resolved Hide resolved
datafusion/src/datasource/local.rs Outdated Show resolved Hide resolved
datafusion/src/execution/context.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@rdettai rdettai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this interesting proposition @yjshen!
I agree with @alamb that async should be taken into account, especially for fetching the file list and metadata which are typically high latency but with little processing. But here you cannot use async because the file list and statistics are materialized at the ParquetTable creation level which is too early. This early materialization will also be problematic with buckets that have thousands of files:

  • getting the metadata from all parquet files will be too long
  • it can be interesting to leave the listing to the last moment, so that in case you implement some partition pruning later on, you can list the files only in the partitions you are interested in.

Overall I would prefer (but this is just my opinion) a higher level abstraction in which we can also plug catalogs such as Delta or Iceberg

datafusion/src/datasource/object_store.rs Outdated Show resolved Hide resolved
@yjshen yjshen marked this pull request as ready for review August 11, 2021 06:49
@yjshen yjshen changed the title Source ext for remote files read Add support for reading remote storage systems Aug 11, 2021
@yjshen yjshen requested review from alamb and houqp August 11, 2021 07:13
@yjshen
Copy link
Member Author

yjshen commented Aug 11, 2021

@houqp @alamb I've done with the original implementation by abstracting file listing/reading logic into ObjectStore and ObjectReader, and I think it's ready for review again.

/// Objct Reader for one file in a object store
pub trait ObjectReader {
    /// Get reader for a part [start, start + length] in the file
    fn get_reader(&self, start: u64, length: usize) -> Box<dyn Read>;

    /// Get lenght for the file
    fn length(&self) -> u64;
}

/// A ObjectStore abstracts access to an underlying file/object storage.
/// It maps strings (e.g. URLs, filesystem paths, etc) to sources of bytes
pub trait ObjectStore: Sync + Send + Debug {
    /// Returns the object store as [`Any`](std::any::Any)
    /// so that it can be downcast to a specific implementation.
    fn as_any(&self) -> &dyn Any;

    /// Returns all the files with filename extension `ext` in path `prefix`
    fn list_all_files(&self, prefix: &str, ext: &str) -> Result<Vec<String>>;

    /// Get object reader for one file
    fn get_reader(&self, file_path: &str) -> Result<Arc<dyn ObjectReader>>;
}

Currently, there are several things remaining (I suppose that are not blockers for this PR, please correct me if get something wrong):

  • Async listing (list_all_files) as well as async reading (get_reader).
  • Figure out for ballista how to register ObjectStore in the client and pass the registration on to executors.
  • Make JSON / CSV read from ObjectReader as well.

@yjshen
Copy link
Member Author

yjshen commented Aug 11, 2021

Regarding the async part, should I just make async fn list_all_files? and wait for parquet / csv reading asynced and proceed?

Copy link
Contributor

@rdettai rdettai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks again @yjshen !

My high level feeling is still that we lack an abstraction for the list of files (catalog).

.unwrap()
.object_store_registry
.store_for_path(root_path);
let root_desc = Self::get_source_desc(root_path, object_store.clone(), "parquet");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in my experience, it is too strict to expect parquet files to have the parquet suffix

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, it was restricted to parquet suffix in the original implementation, so I moved it here. Probably we could make it as an argument and ask from the user?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or just list all files that not start with _ ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @rdettai , I think we can also address this as a quick follow up PR since this is also the old behavior.

datafusion/src/datasource/mod.rs Outdated Show resolved Hide resolved
datafusion/src/datasource/mod.rs Outdated Show resolved Hide resolved
@@ -64,7 +66,8 @@ impl CsvFile {
let schema = Arc::new(match options.schema {
Some(s) => s.clone(),
None => {
let filenames = common::build_file_list(&path, options.file_extension)?;
let filenames = LocalFileSystem
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not also allow csv/json files to be fetched using the object_store_registry ? this would make the behavior more consistent, but can definitively be added later.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I leave out csv/json for now for simplicity, since their reading logic are quite different from parquet, I prefer to do these as follow-ups.

datafusion/src/datasource/parquet.rs Show resolved Hide resolved
@yjshen
Copy link
Member Author

yjshen commented Aug 19, 2021

@alamb @andygrove @Dandandan @jorgecarleitao @rdettai On making the remote storage system object listing & data reading API async, a design choice occurs. This might be quite important, and I'd love to have your suggestions:

To which level should I propagate async?

This was because once we have async dir listing -> we can have async logical plans & async table provider -> we can have async DataFrame / context API

Two available alternatives are:

  1. Limit async to just listing / metadata_fetch / file read, wrap a sync version over these async and keep most of the user-facing API untouched. (keep the PR lean as possible)
  2. Propogate Async API all the way up and finally change the user-facing API: including DataFrame & ExecutionContext. (which includes huge user-facing API changes ).

Currently, This PR took the first approach by constructing all APIs in ObjectStore / ObjectReader / SourceRootDescriptor natively in async and wrap the async function to a sync one. Trying to keep other parts of the project untouched. Great thanks to @houqp for guiding me through the way.

Does approach 1 make sense to you?

If I take approach 1, how should the sync version function be constructed?

This PR tries to make a wrapper over the async counterparts and keep single logic for each functionality. therefore relies on futures::executor::block_on to bridge async to sync function.

However, this approach is flawed for block_on may block the only thread in tokio, and the future inside won't get a chance to run, therefore hanging forever if the tokio runtime is not a multi-threaded one. (I temporarily change the related test to use #[tokio::test(flavor = "multi_thread", worker_threads = 2)] to avoid hanging). Do you have any suggestions on this?

@yjshen yjshen requested review from alamb and houqp August 19, 2021 07:33
@jorgecarleitao
Copy link
Member

Thanks a lot for taking a good look at this and for the proposal.

Propogate Async API all the way up and finally change the user-facing API: including DataFrame & ExecutionContext

Could you describe which APIs would be affected by this? For example, creating a logical plan would become async because we have to read metadata to build a schema, correct? So, for example, things like df = context.read_parquet(...).await?;, right?

I agree with making the planing async: there is no guarantee that we synchronously have all the information to build the plan in the first place, and imo we should not block because we need to read 50 metadata files from s3.

I agree that this would be a major change. :)

@yjshen
Copy link
Member Author

yjshen commented Aug 19, 2021

Could you describe which APIs would be affected by this?

Mainly API change:

  • execution/context: read(register)_parquet / read(register)_csv / read(register)_json. etc.

Other pub function / trait touched:

  • logical_plan: scan(csv/parquet/json)
  • physical_plan: csv / parquet / json

Upstream dependencies need to change:

  • arrow parquet crate: ChunkReader / Length / ParquetReader

@ehenry2 ehenry2 mentioned this pull request Aug 19, 2021
@alamb
Copy link
Contributor

alamb commented Aug 19, 2021

I am starting to check this out carefully

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First of all, thank you again @yjshen and everyone else who has helped out on this PR.

This PR is pretty massive and I think we should begin breaking it down to merge it in -- the longer it stays open the more potential conflicts it hits as well as the longer before others can start playing with / helping out it.

TLDR; I would be in favor of making the DataFusion planning API async. async planning is inevitable, in my opinion, if we want DataFusion to operate on remote catalogs that have not been locally cached in memory and must be accessed via async I/O.

From my perspective, there are actually several major changes to this PR:

  1. An API to read data (during async ExecutionPlan::execute) from a remote file system
  2. An API to read the metadata from a remote filesystem (e.g. what files exist, read parquet statistics, etc)
  3. Partial rewrite of NDJson, CSV and parquet readers to use the new ObjectStore API

The first is sufficient to do partial reads from S3 / other filesources if you already know what files exist there.

The second is needed to drive DataFusion entirely from a remote data source without having to read/cache a catalog locally.

To which level should I propagate async?

Since execution (calling ExecutionPlan::execute and then collect on the result) in DataFusion is async, I think adding the async read (change 1 above) is a relatively small change and no async propagation is needed.

However, since as of today planning (everything up to calling ExecutionPlan::execute) in Datafusion is not async if we want to support async catalog/metadata access (usecase 2 above) then I think we have no choice but to propagate async all the way into planing.

To be clear, given the direction of database systems in general towards distributed systems I am in favor of plumbing async as far up to planning as needed to allow use of DataFusion with non-local catalogs. However, as you have noted, this is a much larger code change.

The alternate compromise, which you have partly implemented in this PR, is to implement both async and non async versions. This is similar to the approach in the C/C++ filesystem api (props to @nealrichardson for the pointer), which has both having synchronous and asynchronous APIs.

I also posted an announcement to the arrow mailing list about this change for broader visibility.

@@ -269,8 +269,8 @@ mod test {
};
}

#[test]
fn distributed_hash_aggregate_plan() -> Result<(), BallistaError> {
#[tokio::test]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case anyone else is interested, this is what happens if you don't have tokio::test:

failures:

---- planner::test::distributed_hash_aggregate_plan stdout ----
thread 'planner::test::distributed_hash_aggregate_plan' panicked at 'there is no reactor running, must be called from the context of a Tokio 1.x runtime', /Users/alamb/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.10.0/src/runtime/blocking/pool.rs:84:33
stack backtrace:
   0: rust_begin_unwind
             at /rustc/a178d0322ce20e33eac124758e837cbd80a6f633/library/std/src/panicking.rs:515:5
   1: core::panicking::panic_fmt
             at /rustc/a178d0322ce20e33eac124758e837cbd80a6f633/library/core/src/panicking.rs:92:14
   2: core::option::expect_failed
             at /rustc/a178d0322ce20e33eac124758e837cbd80a6f633/library/core/src/option.rs:1243:5
   3: core::option::Option<T>::expect
             at /rustc/a178d0322ce20e33eac124758e837cbd80a6f633/library/core/src/option.rs:351:21
   4: tokio::runtime::blocking::pool::spawn_blocking
             at /Users/alamb/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.10.0/src/runtime/blocking/pool.rs:84:14
   5: tokio::fs::asyncify::{{closure}}
             at /Users/alamb/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.10.0/src/fs/mod.rs:119:11
   6: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /rustc/a178d0322ce20e33eac124758e837cbd80a6f633/library/core/src/future/mod.rs:80:19
   7: tokio::fs::metadata::metadata::{{closure}}
             at /Users/alamb/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.10.0/src/fs/metadata.rs:46:5
   8: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /rustc/a178d0322ce20e33eac124758e837cbd80a6f633/library/core/src/future/mod.rs:80:19
   9: datafusion::datasource::object_store::local::list_all_async::{{closure}}
             at /Users/alamb/Software/arrow-datafusion/datafusion/src/datasource/object_store/local.rs:148:8
  10: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /rustc/a178d0322ce20e33eac124758e837cbd80a6f633/library/core/src/future/mod.rs:80:19
  11: datafusion::datasource::object_store::local::list_all::{{closure}}
             at /Users/alamb/Software/arrow-datafusion/datafusion/src/datasource/object_store/local.rs:111:15

@@ -56,9 +56,9 @@ paste = "^1.0"
num_cpus = "1.13.0"
chrono = "0.4"
async-trait = "0.1.41"
futures = "0.3"
futures = { version = "0.3", features = ["executor"] }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we already have tokio (which has full on executor) I don't think we also need the futures executor so I would like to avoid this new dependency.

I tried removing this change locally and it seems to work

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can use tokio::runtime::Handle::block_on rather than futures::executor::block_on as a way to play as nicely as possible with the tokio executor: https://docs.rs/tokio/1.10.0/tokio/runtime/struct.Handle.html#method.block_on

So something like

Handle::current()
  .block_on(async { .... });

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While using tokio::runtime::Handle::block_on, I'm facing with:

’Cannot start a runtime from within a runtime. This happens because a function (like block_on) attempted to block the current thread while the thread is being used to drive asynchronous tasks.

Since block_on is try_entering an already entered runtime, therefore I changed to future::executor's to avoid panic in the first place. But as I noted before, future::executor::block_on is also flawed here:

However, this approach is flawed for block_on may block the only thread in tokio, and the future inside won't get a chance to run, therefore hanging forever if the tokio runtime is not a multi-threaded one. (I temporarily change the related test to use #[tokio::test(flavor = "multi_thread", worker_threads = 2)] to avoid hanging).

Do you have any suggestions on this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the only real suggestion is to plumb async all the way through to planning (aka remove the non async API)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The alternate compromise, which you have partly implemented in this PR, is to implement both async and non async versions. This is similar to the approach in the C/C++ filesystem api (props to @nealrichardson for the pointer), which has both having synchronous and asynchronous APIs.

How about this alternative to reduce the scope of this PR? i.e. implement both sync and async, but only use sync API to migrate existing code to the new IO abstraction, then work on async propagation as a fast follow up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other thing I was thinking about was what about adding in the ObjectStore interfaces in one PR and then start hooking that up into the rest of the system / rewrite the existing data sources (like Parquet, etc) as separate PRs.

I think @yjshen has done a great job with this PR showing how everything would hook together, but I do feel like this PR is slightly beyond my ability to comprehend given its size and scope.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am onboard with further reducing the scope by focusing only on the ObjectStore interface :)

collect_statistics: bool,
) -> Result<SourceRootDescriptor> {
let mut results: Vec<Result<PartitionedFile>> = Vec::new();
futures::executor::block_on(async {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above I think you can use tokio::runtime::Handle::current().block_on

results.into_iter().collect();
let partition_files = partition_results?;

// build a list of Parquet partitions with statistics and gather all unique schemas
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is strange to me that the collating of partitions doesn't happen in get_source_desc_async -- it seems like get_source_desc would just be doing the adapting of async --> sync code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the get_source_desc is used to adapting async to sync to stop propagating async to API.

@yjshen
Copy link
Member Author

yjshen commented Aug 20, 2021

@alamb

The alternate compromise, which you have partly implemented in this PR, is to implement both async and non async versions. This is similar to the approach in the C/C++ filesystem api (props to @nealrichardson for the pointer), which has both having synchronous and asynchronous APIs.

If I understand you correctly, do you mean I should tell sync and async implementation apart, with two different logics? Instead of the current wrapper way (sync function wrap over async logic.)?

@ehenry2
Copy link

ehenry2 commented Aug 20, 2021

I have a question on the ThreadSafeRead trait...is there anything prebuilt (or recommendation) to wrap say a tokio AsyncRead or bytes.Buf to easily implement the get_reader_async function? I see the example for the local filesystem using the FileSource2 to wrap the File, but I'm assuming most remote implementations will approach this function implementation with some kind of in-memory buffer or stream. I had some issues figuring this one out trying to implement this for S3 (I'm still a bit new to rust and lifetimes, etc).

@houqp
Copy link
Member

houqp commented Aug 20, 2021

I'm assuming most remote implementations will approach this function implementation with some kind of in-memory buffer or stream.

I think this would need to be handled case by case for different remote store client. It would be helpful to share exactly what client API signatures you are trying to use within get_reader_async.

@jorgecarleitao
Copy link
Member

Could it make sense to write a design doc like @houqp wrote some time ago for the qualified names? Does it feel a sufficiently impactful change to design this a bit before commiting?

@alamb
Copy link
Contributor

alamb commented Aug 21, 2021

Could it make sense to write a design doc like @houqp wrote some time ago for the qualified names?

I think a design doc is a great idea @jorgecarleitao -- it would let us make sure some of the larger points are clear and there is consensus (especially around adding async in various places). @yjshen I am happy to singtart draft such a document if you think it would help.

I am personally very interested in getting the ideas in this PR into DataFusion -- I think it is an important architectural step forward and since I think it will directly help IOx (the project I am working on) I can spend non trivial amounts of time working on it

@yjshen
Copy link
Member Author

yjshen commented Aug 21, 2021

Thank you @houqp @alamb @jorgecarleitao for your great help!

This PR initially contains several functions related to reading data, including the core object store abstraction, a more general scan partitioning abstraction, and some refactoring of parquet scan. At the same time, as I think more and get more valuable input, the scope becomes more extensive. Although I try to maintain PR lean as possible, leaving out some functionality such as JSON/CVS scan, it grows inevitably huge and is hard to review.

I agree we could make the current PR a proof of concept, and I'm happy to break it down into several parts to get the work finally merged.

As for the design doc for the object store API, I can write up a draft proposal first this weekend. Please help to review and revise it when it's available. Thanks again @alamb for offering the help on the doc part :)

@yjshen
Copy link
Member Author

yjshen commented Aug 21, 2021

I've drafted a design doc here: https://docs.google.com/document/d/1ZEZqvdohrot0ewtTNeaBtqczOIJ1Q0OnX9PqMMxpOF8/edit#. Please help to review it. Thanks!

@alamb
Copy link
Contributor

alamb commented Aug 22, 2021

Thanks @yjshen -- the plan you lay out sounds great

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yjshen one way to avoid the async planning code, might be to have an async version of a ParquetTable TableProvider. Would the following structure work for delta-rs?

(Create a RemoteParquetTable provider outside of Datafusion) <-- async
(Create Execution Context)
(Register Table Providers)
(Create LogicalPlan)
(Create ExecutionPlan)
(Call ExecutionPlan::execute) <-- async
(Stream results back from stream) <-- async

where RemoteParquetTable is something that knows how to interact with the ObjectStore and fetch the appropriate metadata for statistics and schema.

                              Planning these                                           
                              Table Providers                    During                
 ┌───────────────────────┐    results in same                   execution              
 │      (non async)      │     ExecutionPlan                 reads data via            
 │     ParquetTable      │──────┐                              ObjectStore             
 │                       │      │      ┌───────────────────────┐                       
 └───────────────────────┘      │      │        (async)        │      ┌───────────────┐
                                ├─────▶│       existing        │─────▶│  ObjectStore  │
 ┌───────────────────────┐      │      │      ParquetExec      │      └───────────────┘
 │         async         │      │      └───────────────────────┘                       
 │  RemoteParquetTable   │      │                                                      
 │                       │──────┘                                                      
 │  fetches metadata on  │                                                             
 │RemoteParquetTable::new│                                                             
 └───────────────────────┘                                                             
                                                                                       
     Planning Time                         Execution Time                              
                                                                                       

This idea is not as nice as the unified framework you have here but it might allow DF to get to the more unified design incrementally

Today, in my mind the general flow of querying goes like this, and trying to add async to the creation of LogicalPlan or ExecutionPlan is a large change, as you have pointed out

(Create Execution Context)
(Register Table Providers)
(Create LogicalPlan)
(Create ExecutionPlan)
(Call ExecutionPlan::execute) <-- async
(Stream results back from stream) <-- async

@yjshen
Copy link
Member Author

yjshen commented Aug 24, 2021

@alamb I might be wrong on this: is it possible to not provide a RemoteParquetTable, but provide a RemoteParquetTableBuilder that uses the ObjectStore API on the async listing but build a ParquetTable asynchronously?

By doing this, we may pass async table building logic from planning API to users' hands, during they construct ParquetTable TableProvider. Then they could register ParquetTable using context::register_table(self, table_ref, provider). Does this volatiles the idiomatic async in Rust?

I think of this from the perspective of ballista, even though I'm not quite familiar with the code there, it seems ballista could only serialize/deserialize known typed TableProviders, therefore RemoteParquetTable outside DataFusion might not be preferable? I think Rust doesn't support runtime reflection?

@yjshen
Copy link
Member Author

yjshen commented Aug 24, 2021

Would the following structure work for delta-rs?

cc @houqp since I'm not familiar with delta-rs.

@alamb
Copy link
Contributor

alamb commented Aug 24, 2021

but provide a RemoteParquetTableBuilder that uses the ObjectStore API on the async listing but build a ParquetTable asynchronously?

That is a really neat idea @yjshen - I hadn't thought of that but it sounds very good

Then they could register ParquetTable using context::register_table(self, table_ref, provider). Does this volatiles the idiomatic async in Rust?

Not in my opinion.

I think Rust doesn't support runtime reflection?

That is correct -- Rust doesn't have built in runtime reflection support -- that type of behavior needs to be added in the application logic

@houqp
Copy link
Member

houqp commented Aug 26, 2021

I also think constructing ParquetTable async before passing to register_table is a good idea. This is how delta-rs implements its daatafusion integration as well.

With regards to ballista table provider protobuf ser/de limitation, I think it's something we need address in the long term, otherwise, it would impossible to support custom table sources in ballista.

@yjshen
Copy link
Member Author

yjshen commented Aug 26, 2021

As a result of previous discussions on this PR as well as in the design doc (updated according to latest reviews as well). I break down this PR into one dedicated API adding PR #950 and a PartitionedFile abstraction PR #932, and left parquet async integrations as follow-ups.

@yjshen
Copy link
Member Author

yjshen commented Sep 18, 2021

I'm closing this PR since most of the functionalities in this one come true or will soon get in. I am excited about the changes taking place.

@yjshen yjshen closed this Sep 18, 2021
@alamb
Copy link
Contributor

alamb commented Sep 19, 2021

Thank you again for all your work in this area @yjshen -- the improvements to DataFusion are amazing!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ballista datafusion Changes in the datafusion crate enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add support for reading distributed datasets
7 participants