-
Notifications
You must be signed in to change notification settings - Fork 3k
Add position delete filter and utils #1301
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| import org.apache.iceberg.util.StructLikeWrapper; | ||
|
|
||
| public class Deletes { | ||
| private static final Schema POSITION_DELETE_SCHEMA = new Schema( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can move this to a better place later, but it isn't available elsewhere so it makes sense to add it here.
rymurr
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, I enjoyed the streaming sort algorithm :-)
Something for later: a benchmark to understand how large delete files can be before the filter cost explodes.
Agreed. We might want to read all of the delete files at once and keep the records in memory if the set is small enough. We will definitely want to look into that for the vectorized case. |
|
Let me go through this now. |
|
|
||
| public class Deletes { | ||
| private static final Schema POSITION_DELETE_SCHEMA = new Schema( | ||
| Types.NestedField.required(1, "filename", Types.StringType.get(), "Data file location of the deleted row"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why filename and not file_path like we use for DataFile?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didn't think about it. file_path sounds good to me.
| return new SortedMerge<>(Long::compare, positions); | ||
| } | ||
|
|
||
| private static class PositionDeleteFilter<T> extends CloseableGroup implements CloseableIterable<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean we will filter out positions after we read data and project meta columns? Do we plan to push this down to readers in the future? How will it work with vectorized execution?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This filter doesn't require us to answer most of those questions right now. All it requires is some object and a way to get the position of that object. That should be flexible enough that we can handle position as a column or using a class that directly supports it.
For the initial integration with Spark, I was thinking that we would add the _pos metadata column at the end of the requested projection. That way, we can return the rows without copying the data. At most, we would need to tell the row to report that it is one column shorter.
The main idea here is to make it easy for readers to create filters for tasks and apply them. The reader just needs to open the delete files, then pass them to these methods to merge deletes together and use them as a row filter.
For vectorization, we will probably want a different implementation, but we could reuse some of these classes (like the merging iterator).
|
|
||
| @Override | ||
| protected boolean shouldKeep(T row) { | ||
| long currentPos = extractPos.apply(row); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this assume rows are ordered by position?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it assumes that positions are ascending, like the delete positions.
| List<CloseableIterable<Long>> positions = Lists.transform(deleteFiles, deletes -> | ||
| CloseableIterable.transform(locationFilter.filter(deletes), row -> (Long) POSITION_ACCESSOR.get(row))); | ||
|
|
||
| return new SortedMerge<>(Long::compare, positions); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One of the assumptions was that positional deletes are lightweight and we can build an in-memory map from file_path to a set of deleted positions for a given task. If I understand correctly, the current logic will scan deleteFiles for every data file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we consider merge sort as a primary way of applying positional deletes or as a fallback when the number of delete files is too large?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's right. This implementation is the streaming one. It should be simpler to build the version that caches deletes as a set.
|
This looks good to me too. The build failed, though. |
aokolnychyi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 when the tests pass
|
Thanks for reviewing @rymurr, @aokolnychyi! |
This adds a position delete row filter and helpers for constructing a row filter that will filter deletes to a specific data file and merge deletes from multiple delete files. These utilities are written using
CloseableIterableand handle file closing.