New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Core: Enable column statistics filtering after planning #8803
Conversation
946c598
to
c3b5139
Compare
new ContinuousSplitPlannerImpl(tableResource.tableLoader().clone(), scanContext, null); | ||
|
||
ContinuousEnumerationResult initialResult = splitPlanner.planSplits(null); | ||
Assert.assertEquals(1, initialResult.splits().size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be better to use AssertJ-style assertions for newly added code as that makes migrating away from JUnit4 easier. See also https://github.com/apache/iceberg/blob/a3aff95f9e60962240b94242e24a778760bdd1d9/CONTRIBUTING.md#assertj.
That particular check would then be assertThat(initialResult.splits()).hasSize(1)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a few new test in a test class where different assertion methods are used. I think we should migrate them in one PR, and until that stick to the style which is used in the actual test.
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java
Outdated
Show resolved
Hide resolved
@pvary I think we probably want to push the |
That is for reading the data from the manifest file.
So we can not do filtering here. We need to read the stat fields from the manifest file, and then filter later for columns where we do not need it. |
If we look at this line it calls this method from
if we push down the selection to the ManifestReader, it can call the new I understand the current code is for metadata column selection/projection, not the columns selected to include stats |
My understanding is that the planning might need stats which are not required by the user.
This change improves on memory consumption anyway. If we think that we need even more improvement and we accept the extra complexity we can add this feature later. |
8c183a1
to
95ec264
Compare
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java
Outdated
Show resolved
Hide resolved
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java
Outdated
Show resolved
Hide resolved
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java
Outdated
Show resolved
Hide resolved
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java
Outdated
Show resolved
Hide resolved
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
Outdated
Show resolved
Hide resolved
* | ||
* <p>Column stats include: value count, null value count, lower bounds, and upper bounds. | ||
* | ||
* @param requestedColumns column names for which to keep the stats. If <code>null</code> then all |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure how I feel about supporting null
here. Let me think.
Also, won't the current implementation throw an exception if we pass null
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should not support null here as a valid value independently of what we will do in ContentFile
. I would drop the doc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would guess that the situation is the similar with the other @Nullable
TableScanContext
attributes, like:
- snapshotId
- selectedColumns
- projectedSchema
- fromSnapshotId
- toSnapshotId
- branch
We have an undefined default behaviour which could be archived with not setting the values, or setting them to null
. The only difference here is that we define this default behaviour. For consistency's sake we can remove the comment, but the behaviour will remain the same.
Do I miss something?
Thanks for the detailed review!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did a detailed round, would love to hear what others think.
* | ||
* <p>Column stats include: value count, null value count, lower bounds, and upper bounds. | ||
* | ||
* @param requestedColumns column names for which to keep the stats. If <code>null</code> then all |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should not support null here as a valid value independently of what we will do in ContentFile
. I would drop the doc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left some final comments, a lot of them are optional personal suggestions. This looks good to me overall.
@@ -866,6 +866,11 @@ acceptedBreaks: | |||
old: "method void org.apache.iceberg.encryption.Ciphers::<init>()" | |||
new: "method void org.apache.iceberg.encryption.Ciphers::<init>()" | |||
justification: "Static utility class - should not have public constructor" | |||
"1.4.0": | |||
org.apache.iceberg:iceberg-core: | |||
- code: "java.field.serialVersionUIDChanged" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While I think it should be fine, here is an idea. Java comes with serialver
utility that allows us to generate the version UID prior to the change in this PR. We can use that value instead of 1L
to be fully compatible. We don't modify the serialization of this class, we just missed to assign serialVersionUID
. If we can recover the default value, we shouldn't worry about compatibility.
Here is the value I got locally:
cd core/build/classes/java/main
serialver org.apache.iceberg.util.SerializableMap
org.apache.iceberg.util.SerializableMap: private static final long serialVersionUID = -3377238354349859240L;
Could you double check, @pvary? If not, we can keep it as is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checked, but even when setting the serialVersionUID
to -3377238354349859240L
we have a revapi failure.
Also double checked, but serialver
generated the same id for the old and the new code on my mac.
Do resorted to the revapi change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I am not sure what revapi actually does. I doubt they compare actual values. I think we should be fine.
core/src/main/java/org/apache/iceberg/BaseDistributedDataScan.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/BaseIncrementalAppendScan.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
@Override | ||
public DataFile copy() { | ||
return new GenericDataFile(this, true /* full copy */); | ||
return new GenericDataFile(this, true /* full copy */, null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You may consider overloading the constructor so that you don't have to pass an extra null here or adding the comment for the second argument (we have a comment for true
but not null
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this usage is straightforward, and adding a new constructor would not help too much.
So I did not apply this change
core/src/main/java/org/apache/iceberg/IncrementalDataTableScan.java
Outdated
Show resolved
Hide resolved
@@ -154,6 +156,12 @@ ManifestGroup caseSensitive(boolean newCaseSensitive) { | |||
return this; | |||
} | |||
|
|||
ManifestGroup columnsToKeepStats(Set<Integer> newColumnsToKeepStats) { | |||
this.columnsToKeepStats = | |||
newColumnsToKeepStats == null ? null : Sets.newHashSet(newColumnsToKeepStats); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This copy seems redundant but up to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kept as it is more consistent with the other implementations
@rdblue: I have fixed the changes requested by you. If you have any further comments, please leave a review. @aokolnychyi did another throughout review and applied most of his suggested changes. So with 2 +1's, I would like to merge this change in the next few days. Thanks, |
I went through Ryan's comments one more time. They seem to be addressed. I also think the current version is simpler. Let's merge it as is and follow up if needed to unblock subsequent changes in Flink. @rdblue, please let us know if you spot anything else. |
Thanks @nastra, @rdblue, @stevenzwu, @aokolnychyi for the diligent reviews! |
Co-authored-by: Peter Vary <peter_vary4@apple.com>
Co-authored-by: Peter Vary <peter_vary4@apple.com>
Based on our discussion on the dev list, I have created the PR which makes possible to narrow down the retained column statistics in the
ScanTask
returned from planning.For reference the discussion: https://lists.apache.org/thread/pcfpztld5gfpdvm1dy4l84xfl6odxhw8
The PR makes it possible to set the
includeColumnStats
for aScan
. The resultingScanTask
s will contain column statistics for the specific columnIds only, omitting statistics which might be present in the metadata files, but not specifically requested by the user.The PR consists of 3 main parts:
Scan.includeColumnStats
to set the required columnIdsContentFile.copyWithSpecificStats
to provide an interface for the stat removal when copying the file objectsBaseFile
constructor which takes care of the statistics filtering, and making sure that the other implementations are using this method as well.columnStatsToInclude
filed through the different scan implementations, and putting it into theTableScanContext
.ManifestGroup
builder to store thecolumnStatsToKeep
. This class is responsible for the final copy of theDataFiles
where we remove the statistics which are not needed.FlinkReadOption
to set which column stats we should keep:column-stats-to-keep
ScanContext
and Planner changes to propagate the values