Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature Request] Enable ability to read stream from delta table without duplicates. #1490

Closed
1 of 3 tasks
GrigorievNick opened this issue Nov 19, 2022 · 12 comments
Closed
1 of 3 tasks
Assignees
Labels
enhancement New feature or request

Comments

@GrigorievNick
Copy link
Contributor

GrigorievNick commented Nov 19, 2022

Feature request

Overview

Enable the ability to read the stream from the delta table without duplicates.

Motivation

While there are a big amount of technics to deal with duplicates over the stream, like Dirty Partitioning or SCD(2) or ingest from CDC events, they bring costly overhead from a CPU/Memory perspective. Especially when we talk about big data workload.

But there are a lot of business use cases that give the ability to separate Updates/Overwrites/Delete with Append of new data.
Mostly this is streams of Facts, like IoT metrics or user activity, which are always immutable and there are only a few cases when this data must be updated or deleted.

  • Some data/table layouts require row-level deletes to apply GDPR/CCPA/Other_Regulations.
  • Update/Overwrite in 99% of cases is not business logic, but the technical implementation of reprocessing data in case fails/dirty data.

As you may notice this is a pretty rare event in comparison to near real-time data ingestion.
So in case data engineers can use data stream from append-only data, and treat all updates as an exceptional case. DE can achieve significant cost reduction over their data pipeline.

So while this approach increases technical complexity.
It is obvious that separating of AppendOnly ingestion and Reruns brings additional complexity to the system.
But for a lot of data pipelines and business keeping cost per event

P.S. Iceberg has a similar feature. it's not really fair to compare, because Iceberg is not a transaction log and did not support CDC before version 2.0. But still, I think delta may support append-only streams as first-class citizens, and not only CDC.

Further details

Suggested technical implementation:
Add an append-only option to the delta.io stream source.

  1. With this option, the delta source will ignore any commits except blind append. And using this isBlindAppend from CommitInfo to filter out commits which is not blindAppend.
  2. With this option, the delta source will ignore any commits which contain add and remove files simultaneously.

The first approach is deadly simple but will ignore some interesting cases like the MERGE operation which only inserts data.
The second approach will handle MERGE. But from my point of view - this can make API less concise, because people may expect that during the OVERWRITE operation when new data is added, this new data will be available through a stream source. Also, it's not 100% clear, what must be a behavior when DELETE and then MERGE with Only Inserts happen - this may lead to logical duplicates in some cases.

Willingness to contribute

The Delta Lake Community encourages new feature contributions. Would you or another member of your organization be willing to contribute to the implementation of this feature?

  • Yes. I can contribute this feature independently.
  • Yes. I would be willing to contribute this feature with guidance from the Delta Lake community.
  • No. I cannot contribute this feature at this time.
@GrigorievNick GrigorievNick added the enhancement New feature or request label Nov 19, 2022
@GrigorievNick
Copy link
Contributor Author

I plan to create MR for the first option, later today.
In my company, we already use it during the last 2 weeks in production.

@GrigorievNick
Copy link
Contributor Author

Gentle ping, any comments on this?

@allisonport-db
Copy link
Collaborator

Just to confirm, are you aware of the Delta Change Data Feed feature? It seems to me like you could use a CDF stream to accomplish your use case by just filtering out versions with only "insert" rows (which would basically fulfill option (2) that you've outlined above).

@GrigorievNick
Copy link
Contributor Author

GrigorievNick commented Nov 29, 2022

Yes, but it's not related.
Yes - it's possible to implement the same logic Change Data Feed + filter of only inserts.
But the performance parts are very different.
CDF requires:

  1. Additional IO during writing data to delta table.
  2. Additional processing time to read feed.
  3. Additional storage for feed data.

While proposed solutions give the ability:

  • to skip commit completely
  • do not have any additional overhead on writing.
    In my case, with petabytes of data, cost reduction is very significant.
    And also latency and memory on driver reduction to create batch offset is very significant.

@allisonport-db
Copy link
Collaborator

(2) shouldn't be a significant difference with CDF by the way (and the others largely depend on the workload.)

Regardless it sounds like the semantics of what you’re proposing are unclear on multiple different edge cases (for example logical blind appends vs. insert-only operations). We would need to clarify the exact semantics and also see if this is something others in the community would use.

But first I think it would be helpful to see some performance numbers w.r.t CDF to justify adding a new feature when it can be done with CDF.

@GrigorievNick
Copy link
Contributor Author

GrigorievNick commented Nov 30, 2022

But first I think it would be helpful to see some performance numbers w.r.t CDF to justify adding a new feature when it can be done with CDF.

I am not sure, I will be able to do such a comparison on my amount of data.
For example, during applying GDPR on my dataset, more than 100k files were modified.
So last time when I try enable CDF, I stuck with too many slowdowns.
This actually can be fixed together with s3 support, so they fix the pattern of how they partition data in my buckets.
Another issue to test CDF, I need a budget to approve.

And on the small amount of data, I am not sure that difference will be obvious.

We would need to clarify the exact semantics and also see if this is something others in the community would use.

I don't get your point.
I provide two different semantics, they have different behaviors.
In our system, we choose blindAppend way.
So please provide details or questions, about what additional clarification is required.

P.S.:
My main argument, I don't want any additional cost and I also want to use stream API without any duplicates.

As I mentioned in the issue description.

  1. All my data is facts, in this use case, 99.9% of the operation is blind append. And very few maintain operations to update this data.
  2. Besides that, cost per record is the most important KPI for this data pipeline.

So I just wanna have the ability to stream all this append from the table and ignore any update/delete/overwrite commits, without any additional overhead.

@allisonport-db
Copy link
Collaborator

If your use case is 99.9% blind appends, the overhead of CDF should be close to none actually. The way CDF works, for operations like appends, no additional data files are written per commit.

CDF requires:

  1. Additional IO during writing data to delta table.
  2. Additional processing time to read feed.
  3. Additional storage for feed data.

For your workload the above should not be true given how CDF is implemented.

Given CDF provides a working solution and should have little to no overhead for your use case it doesn't make sense to add a whole new feature. If this is not the performance you are seeing with CDF enabled then this is where some performance numbers would really help (and we would want to address why you are seeing significant slowdowns with CDF enabled.)

Happy to discuss more in depth how CDF works if you would like and here's the design doc if you would like to take a look.

@GrigorievNick
Copy link
Contributor Author

GrigorievNick commented Dec 1, 2022 via email

@GrigorievNick
Copy link
Contributor Author

Given CDF provides a working solution and should have little to no overhead for your use case

From the (design) doc that you mention.

Basic exploratory modeling suggests the performance cost of an extra read pass is only ~25% the cost of rewriting the full table, or ~20% of the query cost when our first pass scan to collect touched files is accounted for.

I found 20% overhead as very significant.

@tdas
Copy link
Contributor

tdas commented Dec 1, 2022

The numbers you are referring to from the design doc is comparing between different approaches for writing the CDF data. The exact number you referred to is for the rejected approach for writing CDF. The correct overheads to consider are as follows:

  1. Does enabling CDF increase Delete/Update/Merge write times?
    The answer is, it is mostly really small. Typical d/u/m operation modify a small fraction of the rows in the table. Due to the copy-on-write approach, these small number of modified rows cause a lot of files to be rewritten which take bulk of the operation duration. For example, modifying 10 rows can cause 10 files with 10M rows to be rewritten. When you enable CDF, this relatively small number of modified rows are duplicated and written to a separate set of files, which increases the file rewrite time but a tiny fraction compared to without CDF enabled (that is, 10M + 10 rows compared 10K rows in the earlier example). So usually CDF has very little additional cost.

  2. Does use CDF increase streaming read cost in a most append workload?
    Appends / inserts dont produce separate CDF files. CDF streaming reads use the main data files and treat them "inserted" rows. For the 1% of non-appends, if you want to modified rows, then they are already in separate CDF files for being efficiently read. If you dont care about modified rows, then you can simply put a filter in your query (_cdc_type = "insert") and the filtering should be pretty low overhead within that 1% non-inserts.

So unless you have actually tested your workload and have concrete numbers showing either write or read overheads cause by enabling CDF being too much, I really think CDF is the right approach.

Footnote, we are aware of the massive baselien cost of Copy-on-write. Thats why we are building Deletion Vector based write-optimized u/d/m.

@GrigorievNick
Copy link
Contributor Author

OK, thx for detail explanation. I will come back with concrete numbers later.

@allisonport-db
Copy link
Collaborator

Hey I think this was done in #1616 and should suffice for your use case. Closing this now feel free to re-open if you see fit.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment