feat(scan): support data evolution row ID range filter#207
feat(scan): support data evolution row ID range filter#207XiaoHongbo-Hope wants to merge 23 commits into
Conversation
|
We can also support query _ROW_ID in this PR too. |
d836c64 to
d20243e
Compare
|
And please do not add issue in commit, this will make issue a lot of news. |
d140ab8 to
c4a34e7
Compare
👌 |
8145479 to
8728044
Compare
ed544fd to
7196a74
Compare
0803575 to
2cf0c78
Compare
6ab9e31 to
c23c392
Compare
…n and append_null_row_id_column
…Selection instead of skipping IO filtering - Replace post-read filter approach with pre-computed selected row ID sequence - RowSelection is always applied at Parquet level for IO optimization - Row IDs are assigned from the pre-computed sequence matching RowSelection output - Extract insert_column_at to deduplicate column insertion logic - Empty row_ranges treated as None (no filtering) - Use saturating_add to prevent overflow in merge_row_ranges and build_row_ranges_selection - Compute row_ranges before moving file_group to avoid clone - Remove arrow-select dependency (no longer needed)
e894feb to
435854c
Compare
24fa539 to
bfc11c0
Compare
| self | ||
| } | ||
|
|
||
| pub fn is_row_id_field(&self) -> bool { |
There was a problem hiding this comment.
Remove this.
Removed
| self.to - self.from + 1 | ||
| } | ||
|
|
||
| pub fn is_empty(&self) -> bool { |
There was a problem hiding this comment.
It is useless. Already debug_assert!(from <= to);.
There was a problem hiding this comment.
It is useless. Already
debug_assert!(from <= to);.
👌
There was a problem hiding this comment.
It is useless. Already
debug_assert!(from <= to);.
Removed
e7cd932 to
d90879e
Compare
luoyuxia
left a comment
There was a problem hiding this comment.
@XiaoHongbo-Hope Thanks. Left minor comments
| // With data predicates, merged_row_count() reflects pre-filter row counts, | ||
| // so stopping early could return fewer rows than the limit after filtering. | ||
| let splits = if self.data_predicates.is_empty() { | ||
| let splits = if self.data_predicates.is_empty() && self.row_ranges.is_none() { |
There was a problem hiding this comment.
nit: may also update comment to reflect newly row_ranges
There was a problem hiding this comment.
nit: may also update comment to reflect newly row_ranges
Added
|
|
||
| impl RowRange { | ||
| pub fn new(from: i64, to: i64) -> Self { | ||
| debug_assert!(from <= to, "RowRange from ({from}) must be <= to ({to})"); |
There was a problem hiding this comment.
debug assert! won't prevent illegal RowRange in release.
Either assert! or return Result<Self>
There was a problem hiding this comment.
debug assert! won't prevent illegal RowRange in release. Either
assert!or returnResult<Self>
Thanks, updated
|
|
||
| /// Expand row_ranges into a flat sequence of selected row IDs for a file. | ||
| fn expand_selected_row_ids(first_row_id: i64, row_count: i64, row_ranges: &[RowRange]) -> Vec<i64> { | ||
| let file_end = first_row_id + row_count - 1; |
There was a problem hiding this comment.
what if row_count is 0, is expected file_end will be less than first_row_id?
There was a problem hiding this comment.
what if row_count is 0, is expected file_end will be less than first_row_id?
Added an early return.
| let first_row_id = data_files[0].first_row_id.unwrap_or(0); | ||
| let file_row_count = data_files[0].row_count; | ||
| let total_rows = match &row_ranges { | ||
| Some(ranges) => expand_selected_row_ids(first_row_id, file_row_count, ranges).len(), |
There was a problem hiding this comment.
nit:
in plan phase the ranges in the data split are already merge_row_ranges, but the read phase here, it will still do another merge_row_ranges? Is it duplicated?
There was a problem hiding this comment.
nit: in plan phase the ranges in the data split are already merge_row_ranges, but the read phase here, it will still do another merge_row_ranges? Is it duplicated?
Fixed
| current.to = current.to.max(r.to); | ||
| } else { | ||
| merged.push(current); | ||
| current = r.clone(); |
There was a problem hiding this comment.
nit: we don't need to do the clone, just take over the owership
if ranges.len() <= 1 {
return ranges;
}
ranges.sort_by_key(|r| r.from);
let mut merged = Vec::with_capacity(ranges.len());
let mut iter = ranges.into_iter();
let mut current = iter.next().unwrap();
for r in iter {
if r.from <= current.to.saturating_add(1) {
current.to = current.to.max(r.to);
} else {
merged.push(current);
current = r;
}
}
merged.push(current);
merged
There was a problem hiding this comment.
nit: we don't need to do the clone, just take over the owership
if ranges.len() <= 1 { return ranges; } ranges.sort_by_key(|r| r.from); let mut merged = Vec::with_capacity(ranges.len()); let mut iter = ranges.into_iter(); let mut current = iter.next().unwrap(); for r in iter { if r.from <= current.to.saturating_add(1) { current.to = current.to.max(r.to); } else { merged.push(current); current = r; } } merged.push(current); merged
Updated.
…redundant merge, avoid clone in merge_row_ranges, update limit comment
fc63da0 to
1158447
Compare
718c2c7 to
1d3cf97
Compare
Purpose
Linked issue: sub task of #173
Brief change log
Tests
API and Format
Documentation