Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decouple Streaming Use-Case from File IO Abstractions #7994

Closed
tustvold opened this issue Oct 30, 2023 · 23 comments · Fixed by #8021
Closed

Decouple Streaming Use-Case from File IO Abstractions #7994

tustvold opened this issue Oct 30, 2023 · 23 comments · Fixed by #8021
Assignees
Labels
enhancement New feature or request

Comments

@tustvold
Copy link
Contributor

tustvold commented Oct 30, 2023

Is your feature request related to a problem or challenge?

Currently we accommodate streaming workloads within DataFusion by overloading the file IO abstractions.

This is not always a very good fit and results in a number of workarounds:

As DataFusion gets more sophisticated about handling catalogs, reading/writing partitioned data, caching data, this overloading is getting more and more arcane and hard to reason about, and I think it is overdue we do something to address it.

Describe the solution you'd like

I would like to separate the notions of FileSink and FileScan from a StreamSink and StreamSource, this would allow abstractions that better fit their respective use-cases.

In particular

  • FileSink and FileScan can focus on reading/writing partitioned immutable files following standard big data practices
  • StreamSink and StreamSource can focus on reading/writing CSV / JSON (/ Avro) data from streaming sources

Not only would this simplify the current code, but would also expand the streaming support in DataFusion

  • Allows for more efficient non-blocking IO, as linux FIFO's support poll(2) (unlike general files)
  • Potential future integrations with data streaming systems such as Kafka, etc...

Describe alternatives you've considered

No response

Additional context

No response

@tustvold tustvold added the enhancement New feature or request label Oct 30, 2023
@alamb
Copy link
Contributor

alamb commented Oct 30, 2023

I would like to separate the notions of FileSink and FileScan from a StreamSink and StreamSource, this would allow abstractions that better fit their respective use-cases.

Sounds like a great idea to me.

@devinjdangelo
Copy link
Contributor

+1 from me as well. The more we finely optimize batch/streaming reads and writes the more the specific implementations ideally will diverge a bit. I felt this when working on parallelizing writes as some code that work great for batch caused inescapable infinite loops for infinite sources, requiring adding special cases to the code. Separating the two cases will allow both use cases to optimize appropriately without harming the other use case.

tustvold added a commit to tustvold/arrow-datafusion that referenced this issue Oct 31, 2023
tustvold added a commit to tustvold/arrow-datafusion that referenced this issue Oct 31, 2023
tustvold added a commit to tustvold/arrow-datafusion that referenced this issue Oct 31, 2023
tustvold added a commit to tustvold/arrow-datafusion that referenced this issue Oct 31, 2023
tustvold added a commit to tustvold/arrow-datafusion that referenced this issue Oct 31, 2023
tustvold added a commit to tustvold/arrow-datafusion that referenced this issue Nov 1, 2023
@tustvold
Copy link
Contributor Author

tustvold commented Nov 1, 2023

I created a POC of ripping append semantics out of ListingTable in #8017 and the good news is this is very achievable 😄

@metesynnada
Copy link
Contributor

metesynnada commented Nov 1, 2023

I, too, am in favor of this split; the StreamSink methodology's implementation significantly enhances the unified approach, maintaining the integrity of features on both ends. At the moment, my agenda is filled with high-priority tasks, but I am available to assist with code reviews and provide guidance on the overall structure of the code. Additionally, I will contribute some tests to the codebase to verify that we are preserving the streaming functionality. I believe this will also serve to optimize the Datafusion use cases. Thank you for your hard work, @tustvold, @alamb, and @devinjdangelo

Btw, If a Google Doc or any similar platform is accessible, I would be glad to contribute to it.

@tustvold
Copy link
Contributor Author

tustvold commented Nov 1, 2023

Whilst discussing potential API designs with @alamb the question came up of whether the existing TableProvider abstractions are now sufficiently mature that we could just use them directly. In particular the combination of TableProvider::insert_into and TableProviderFactory I think may cover all the necessary extension points for people to implement streaming workloads as they deem fit.

I tried to show what this might look like in #8021 PTAL.

If this would be sufficient for the streaming use-cases, perhaps we can avoid needing to define further abstractions, and can simply remove the streaming support from ListingTable proper, i.e. a more polished version of #8017?

@alamb
Copy link
Contributor

alamb commented Nov 8, 2023

There is a lot of related discussion on #8021 (comment)

@metesynnada
Copy link
Contributor

metesynnada commented Nov 11, 2023

I wrote an initial design document, @alamb and @tustvold , can you check this out? I have tried to cover all use cases while making it easier to develop the batch mode feature. The document allows commenting, so please share your thoughts 😀

@alamb
Copy link
Contributor

alamb commented Nov 11, 2023

I wrote an initial design document, @alamb and @tustvold , can you check this out? I have tried to cover all use cases while making it easier to develop the batch mode feature. The document allows commenting, so please share your thoughts 😀

Thanks @metesynnada -- I will do so, likely later today or tomorrow

@tustvold
Copy link
Contributor Author

tustvold commented Nov 11, 2023

Thank you for writing this up, before getting into specific questions about the design, I think it would help to articulate what the objectives here are? Some possibilities might be:

  1. To replicate the functionality currently provided by ListingTable
  2. To facilitate the addition of new streaming sources beyond FIFO files

From my reading of the document, the proposed abstractions appear to largely mirror the equivalent TableProvider abstractions used in #8021, whilst also quite closely fitting to the current FIFO file mechanism based around serialized byte streams. This makes me suspect that neither of the above is quite the vision here?

On a more concrete level, it occurs to me that if StreamStoreRegistry were moved under StreamingTableFactory, all of this functionality would be encapsulated under one "extension point". Perhaps this might provide a mechanism to iterate and evolve these APIs incrementally, e.g. in datafusion-contrib or similar, without needing to front-load the design effort? This would allow new streaming sources to be added, and the abstractions evolved as necessary, all in one place? I for one anticipate Kafka, and certainly Kinesis, will require some non-trivial iteration on these APIs to accommodate their particular quirks. Just a suggestion, but it might not only allow us to make progress on this ticket quicker, but also yield a better development experience for ongoing work on the streaming functionality, to not be split across repositories.

@alamb
Copy link
Contributor

alamb commented Nov 11, 2023

I have read the document and I really like it. Thank you @metesynnada.

Use Case

My understanding is that the goal of this exercise is not primarily about the specifics of decoupling append/FIFO from ListingTable, but the more general goal of easily using DataFusion to build systems with functionality similar to Apache Flink, as described in #4285

In particular the goal is to make it easy to

  1. Read data from "streaming" sources
  2. Write data to "streaming" sinks

Where "streaming" means the data is NOT composed of a fixed set of immutable objects (as in object storage), but rather some number of "streams" which can be appended / consumed. Some examples are FIFO files, Kafka, RabbitMQ, Kinesis, etc.

One of the major challenges in trying to implement such streaming systems today is that ListingTable, as its name implies, is implemented assuming the data is being read or written to an object store, with both the benefits and limitation of the object_store API.

Design Feedback

I really like the idea to create a parallel StreamingTable to ListingTable that is backed by a different API than object store and that can be used to build such streaming usecases. It much better matches:

  1. The differences in reading from a streaming vs immutable discrete objects in object store
  2. The difference in writing data out in a streaming fashion compared to immutable discrete objects in object store

Requests / ideas

As we expand DataFusion in this streaming direction, I would like to request we take this opportunity to define some more crate boundaries so the ListingTable is not so tightly integrated / intertwined, and likewise StreamingTable is not so so intertwined -- perhaps we can aim to end up with three new crates datafusion_listing_table datafusion_streaming_table and datafusion_data_format

Next steps:

Would it be possible to create a PoC / proposal with the basic APIs and make sure they fit together and into the rest of DataFusion? This is likely to be a large change, so I think getting the skeleton in place and then filling out the details in subsequent PRs (rather than one massive one) would be my preferred process.

@alamb
Copy link
Contributor

alamb commented Nov 11, 2023

I think we can probably sketch out enough of this StreamingTable API to migrate the existing FIFO support pretty quickly. I would be happy to help / review quickly

@tustvold
Copy link
Contributor Author

tustvold commented Nov 11, 2023

I would like to request that we don't overload the file format abstractions for streaming use-cases as that is part of the confusion I am trying to eliminate. There isn't a great deal of logic that can be shared anyway. Otherwise sounds good to me.

I still would favor not trying to front-load defining StreamingTable and instead allow it to form naturally as new backends are added, I am not confident that log oriented stores like kinesis and Kafka map well to the proposed API and it seems premature to be designing an abstraction before having multiple implementations to abstract 😅

@alamb
Copy link
Contributor

alamb commented Nov 11, 2023

I agree that starting with a basic StreamingTable abstraction, and copying over only API features from the currentListingTable that are needed (and not replicating APIs until necessary) makes a lot of sense

I would like to request that we don't overload the file format abstractions for streaming use-cases as that is part of the confusion I am trying to eliminate.

Do you mean the FileFormat trait? At the moment, it seems like it is tightly tied to ObjectStore both in the infer methods and create_writer_physical_plan via FileSinkConfig so I agree it is not clear how to make this easily fit into the streaming design

My understanding that the StreamingTable would an analgous trait (like StreamFormat perhaps).

@ozankabak
Copy link
Contributor

My understanding that the StreamingTable would an analgous trait (like StreamFormat perhaps).

Exactly.

I think we can probably sketch out enough of this StreamingTable API to migrate the existing FIFO support pretty quickly. I would be happy to help / review quickly

Thank you 🚀

We will start implementing this next week to get the core APIs and the functionality in place. Your help in fine-tuning the design through reviews during this process will be much appreciated. @metesynnada and I scoped out the work and it seems the implementation should go relatively smoothly.

@tustvold
Copy link
Contributor Author

What do people think about perhaps polishing up #8021 into a v0.1 for StreamingTable? I believe most of the functionality is there, and I would be happy to do this.

Just thinking about ways I could help get this over the line...

@alamb
Copy link
Contributor

alamb commented Nov 14, 2023

Thank you @tustvold -- I think that would be very helpful assuming @metesynnada / @ozankabak are not already working on it.

@ozankabak
Copy link
Contributor

We started working on it and hope to get things ready for review this week. We will appreciate your help on reviewing it. Thank you @alamb and @tustvold

@tustvold
Copy link
Contributor Author

tustvold commented Nov 14, 2023

Is there any value in starting small with something based off #8021? I'm confident I could have something ready for review this morning? This would allow us to take the time to get the v1 design right, whilst unblocking things in the meantime?

@ozankabak
Copy link
Contributor

I want to avoid any possible overlap between the progress so far on our end and your time. Given that we will have something out this week, I think we can best utilize your time if you carefully review the upcoming PRs and help us optimize the design and code through that process. Once we have the basics out, there will also be some important work to remove leftover code in the ListingTable end to optimize it fully for bounded files, your help will be crucial there too.

@tustvold
Copy link
Contributor Author

tustvold commented Nov 14, 2023

My time is immaterial, I just want to get this over the line. If it all gets replaced in 2 weeks, I would be more than happy.

As it stands now though, from my perspective, we have had a largely functionally complete PR that preserves the current functionality for weeks, and the approach was signed off by @metesynnada - #7994 (comment) 2 weeks ago. I am therefore I think pretty understandably frustrated at the prospect of yet further delay for reasons that haven't been articulated.

I would very much like to just get this over the line so that the work in DF and IOx that is blocked on it can move forward. If that means doing work that will be thrown away, so be it, it has mostly already been done.

@ozankabak
Copy link
Contributor

ozankabak commented Nov 14, 2023

Dear @tustvold,

Just like you, we also would like to get to a state where everyone can move forward, as we have been negatively impacted by this situation. I would like to remind you that we did not create the situation we are all in. The breaking arrow-rs change, which is the root cause of the time pressure, was done without us knowing. We were only notified of it after your merged it.

The fact that we agreed on the general aspects of the approach to adopt in terms of the roadmap does not mean we committed to a very specific timeframe. If we are to discuss frustrations, we can contribute quite a few talking points. I don't want to go down that path, as it will not help remedy the situation. I invite you to consider doing the same.

Whatever happened, happened. We are all active contributors to this project, and regardless of the situation we should focus on finding productive ways to work together. @alamb and I engaged in such a discussion last week to chart a path forward, and we are acting in good faith to get to a resolution as quickly as possible. The first step was to come up with a proposal, and we did in a timely manner. @alamb took a detailed look at it, and it seems we are on the same page w.r.t. to the general approach within it. Now, we are in the process of implementing it. I have said before that we will have something out this week, and @alamb will help us with reviews and suggestions to move the ball forward quickly when it is out (#8151). We are making progress as a community with a focus on moving forward. Please, I request from you to not exacerbate the situation needlessly. If you would like to help, we do welcome your help -- I already mentioned some possible ways how.

Datafusion is a great project with a huge role to play in the data ecosystem if we are able to form and maintain a cohesive community around it. I hope we don't lose sight of this due to temporary roadblocks like the one we are going through right now.

@tustvold
Copy link
Contributor Author

tustvold commented Nov 14, 2023

I have already updated #8021 inline with the design proposal, if this is acceptable I would very much like to consider moving forwards with it as a stopgap. I leave the rest with you

If we are to discuss frustrations, we can contribute quite a few talking points.
I request from you to not exacerbate the situation needlessly

I apologise if that is how it has come across, that has not been my intention, I purely wish to keep things move things forward. I did not anticipate this change being any more fraught than the fairly frequent breaking changes we routinely make. Change is necessary but is disruptive and I could have done a better job communicating this.

@ozankabak
Copy link
Contributor

Thank you, I appreciate you sharing your reflection publicly. I am glad that we are able to come together and collaborate towards a solution. @metesynnada and I are reviewing #8021 to see how we can use it as a base to accelerate the work towards implementing the proposal. We will circle back shortly.

tustvold added a commit that referenced this issue Nov 15, 2023
* Implement FIFO using extension points (#7994)

* Clippy

* Rename to StreamTable and make public

* Add StreamEncoding

* Rework sort order

* Fix logical conflicts

* Format

* Add DefaultTableProvider

* Fix doc

* Fix project sort keys and CSV headers

* Respect batch size on read

* Tests are updated

* Resolving clippy

---------

Co-authored-by: metesynnada <100111937+metesynnada@users.noreply.github.com>
@tustvold tustvold self-assigned this Nov 15, 2023
tustvold added a commit to tustvold/arrow-datafusion that referenced this issue Nov 15, 2023
tustvold added a commit to tustvold/arrow-datafusion that referenced this issue Nov 15, 2023
tustvold added a commit that referenced this issue Nov 17, 2023
* Remove FileWriterMode Support (#7994)

* Don't ignore test

* Error on insert to single file

* Improve DisplayAs
tustvold added a commit that referenced this issue Nov 18, 2023
* POC: Remove ListingTable Append Support (#7994)

* Prepare object_store 0.8.0

* Fix datafusion-cli test

* Update arrow version

* Update tests

* Update pin

* Unignore fifo test

* Update lockfile
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants