Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Document streaming usecase (like UNBOUNDED tables) #9016

Open
3 tasks
alamb opened this issue Jan 27, 2024 · 13 comments
Open
3 tasks

Document streaming usecase (like UNBOUNDED tables) #9016

alamb opened this issue Jan 27, 2024 · 13 comments
Assignees
Labels
enhancement New feature or request

Comments

@alamb
Copy link
Contributor

alamb commented Jan 27, 2024

Is your feature request related to a problem or challenge?

Someone asked in discord:

I'm looking at Datafusion and Polars as potential solutions for calculating averages over a sliding window of events, where the window is bound by event time. I've just come across Datafusion, would anyone be able to clarify if it's suitable for this use case? In essence, I have events streaming in via RPC that I want to feed into a a system that gives the above outcome.

I am pretty sure this is exactly the case for using UNBOUNDED tables with explicitly defined ORDER BY from Synnada and Arroyo others. However, when I went to look for the documentation, I could't find any mention of this usecase or documentation of unbounded tables

Describe the solution you'd like

I would like to help make it easier for people to use DataFusion for streaming usecases by:

Describe alternatives you've considered

No response

Additional context

No response

@alamb alamb added the enhancement New feature or request label Jan 27, 2024
@alamb
Copy link
Contributor Author

alamb commented Jan 27, 2024

@metesynnada @mustafasrepo @ozankabak or @edmondop @mwylde do you know of any existing documentation / examples we could adapt?

@alamb
Copy link
Contributor Author

alamb commented Jan 27, 2024

I think the major initial push in this area came in #4694

@Tangruilin
Copy link
Contributor

I can help with it.

Assign to me. @alamb

@edmondop
Copy link
Contributor

Would one need to write custom unbounded sources?

@alamb
Copy link
Contributor Author

alamb commented Jan 27, 2024

Would one need to write custom unbounded sources?

I don't think so @edmondop -- I was thinking anything that gives others examples / help starting would be great. Maybe we can start with some SQL reference / API docs and high level commentary in one PR and then add an example as another PR

Right now there is nothing documented, so it will be very easy to imprve the status quo!

@trungda
Copy link
Contributor

trungda commented Jan 29, 2024

I've realized that StreamEncoding is not supported for Parquet. Is it intentional?

@leoyvens
Copy link
Contributor

leoyvens commented Feb 8, 2024

The example on #9070 is great for understanding how to track event time with GROUP BY. I have a question that this doc effort could help answer: Is there any trick out there for watermarking, that is, a way for a table to emit a signal that all events at a given event time have been sent?

@alamb
Copy link
Contributor Author

alamb commented Feb 8, 2024

: Is there any trick out there for watermarking, that is, a way for a table to emit a signal that all events at a given event time have been sent?

The way most of the code in DataFusion works is that it will use the next distinct value in the data to trigger emission (as in you have to see an event of the next time)

I don't know of any way to send a synthetic signal that says "will never see any more values in this time interval"

@ozankabak
Copy link
Contributor

I don't know of any way to send a synthetic signal that says "will never see any more values in this time interval"

Yes, IMO this seems to fall out of scope for "upstream" Datafusion (even though we also have these challenges but choose to solve them downstream due to this reasoning).

@milenkovicm
Copy link
Contributor

A bit late to this party :), few questions regarding this topic:

  1. I would tell that UNBOUNDED is intended to be used with queries like INSERT INTO kafka_table_1 SELECT * FROM kafka_table_2, do I get this correctly?
  2. if 1 is correct, I believe we would have to detach ctx.sql(...) from the main thread, not to block other statements, so the question is who will be in charge of query cancelation? Would it be/ is it part of datafusion, or it has to be implemented as part of the source, or somewhere else

thanks a lot

@alamb
Copy link
Contributor Author

alamb commented Apr 10, 2024

2. if 1 is correct, I believe we would have to detach ctx.sql(...) from the main thread, not to block other statements, so the question is who will be in charge of query cancelation? Would it be/ is it part of datafusion, or it has to be implemented as part of the source, or somewhere else

I think the execution is run as a separate tokio task -- so whenever someone wants to see if the next result is available they would do stream.next().await or something equivalent

@milenkovicm
Copy link
Contributor

wrt point ,. if we have a running unbounded pipeline, when shutdown/cancel event comes, sources should stop producing, probably commit offsets and so on and give chance for rest of the pipeline to finish all the bits in-flight before shutdown.

so if i can reformulate my question 2, "who will inform source that it should wrap up for this execution and shut down ? :)"

or am I missing something obvious here?

@alamb
Copy link
Contributor Author

alamb commented Apr 11, 2024

so if i can reformulate my question 2, "who will inform source that it should wrap up for this execution and shut down ? :)"

I think this needs to be handled by the containing system (nothing in DataFusion will do it directly). It would have to hold on to some reference to the source that can be signaled to stop.

In terms of shutdown, when the stream is droped, it will shutdown all resources attached to the executing plan. This is described a bit here https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#cancellation--aborting-execution. However, this doesn't do anything to save state as would be required in a streaming system

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

7 participants