-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a Spark procedure to collect NDV #6582
base: main
Are you sure you want to change the base?
Conversation
@@ -26,4 +26,6 @@ private StandardBlobTypes() {} | |||
* href="https://datasketches.apache.org/">Apache DataSketches</a> library | |||
*/ | |||
public static final String APACHE_DATASKETCHES_THETA_V1 = "apache-datasketches-theta-v1"; | |||
|
|||
public static final String NDV_BLOB = "ndv-blob"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spark doesn't use Apache DataSketches to collect approximate NDV, so I am adding a new blob type. Hope this is OK.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@findepi What are you using for NDV stats here? I figure we should have a common blob type
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
'blob' seems a bit redundant as they are all blobs? And also looking at the code, it's an approx ndv, which I didnt get from this name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should use one blob type for NDV ideally, although Spark doesn't have the sketch data. I'm also curious how sketch data is useful for a table level metric. It is absolutely useful for file-level and partition-level since we can merge them later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@szehon-ho Right, we should have a better name for this. I am not sure if we can have a common blob type here. I will wait for @findepi 's input before changing this one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So it is impossible for us to make Theta Sketches using Spark? Things like that would be healthier for the long run if we implement that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with you @RussellSpitzer . BTW engine interop is the primary reason why we have settled on Theta sketches. For Trino it would be easier to go with HLL, since that's what Trino engine & SPI are supporting for years now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@RussellSpitzer @findepi I agree it would be ideal if Spark can support Theta sketches. I will take a look to see the possibility to implement this.
@findepi Besides using NDV, Spark also uses other column stats such as NumOfNulls, Min, Max, etc. for CBO. I am wondering if Trino also use these stats and if these stats should also be stored in TableMetaData
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should create real Theta sketches. If Spark only needs the NDV integer, then that's great. We can either keep track of NDV sketch and incrementally update internal to Iceberg, or we can do it async. Either way, there should be no need for a different sketch type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be clear I don't think we need to get this into OSS Spark, I think it's fine if we generate these sketches in user land code.
/** | ||
* A procedure that gets approximate NDV (number of distinct value) for the requested columns | ||
* and sets this to the table's StatisticsFile. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am debating myself if I should collect ndv only or also collect everything else such as max, min, num_nulls etc. in ANALYZE TABLE. I will just collect ndv for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea we have all those in the Iceberg file level metadata already, wonder if its necessary as we could combine those to have an aggregate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have file level metadata for max, min, num_nulls etc. That's why I was hesitate to include those here. We don't have file level ndv, though.
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/DistinctCountProcedure.java
Show resolved
Hide resolved
String query = "SELECT "; | ||
for (int i = 0; i < columnSizes; i++) { | ||
String colName = columns.getUTF8String(i).toString(); | ||
query += "APPROX_COUNT_DISTINCT(" + colName + "), "; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we are technically not using distinct here, maybe we should be calling the procedure "analyze"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will change the procedure name from DistinctCountProcedure
to AnalyzeTableProcedure
for (int i = 0; i < columnSizes; i++) { | ||
writer.add( | ||
new Blob( | ||
StandardBlobTypes.NDV_BLOB, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The issue with defining a new a new blob type here is we probably need to describe in the spec, otherwise folks won't be able to deserialize it
@@ -26,4 +26,6 @@ private StandardBlobTypes() {} | |||
* href="https://datasketches.apache.org/">Apache DataSketches</a> library | |||
*/ | |||
public static final String APACHE_DATASKETCHES_THETA_V1 = "apache-datasketches-theta-v1"; | |||
|
|||
public static final String NDV_BLOB = "ndv-blob"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
'blob' seems a bit redundant as they are all blobs? And also looking at the code, it's an approx ndv, which I didnt get from this name.
/** | ||
* A procedure that gets approximate NDV (number of distinct value) for the requested columns | ||
* and sets this to the table's StatisticsFile. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea we have all those in the Iceberg file level metadata already, wonder if its necessary as we could combine those to have an aggregate?
|
||
TableOperations operations = ((HasTableOperations) table).operations(); | ||
FileIO fileIO = ((HasTableOperations) table).operations().io(); | ||
String path = operations.metadataFileLocation(String.format("%s.stats", UUID.randomUUID())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it exists, we throw FileNotFoundException? Should we just check and throw better exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean AlreadyExistsException
, right? Yes, we should check. I guess we can probably keep the AlreadyExistsException
but make the error message better.
|
||
String viewName = viewName(args, tableName); | ||
// Create a view for users to query | ||
df.createOrReplaceTempView(viewName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if I missed something, is there a point to keeping it as view, if its already returned by the procedure?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I kept this as a view so users will have an easy way to query the statistics information after calling the stored procedure.
The main reason I am adding this store procedure is because I can't get an agreement to implement ANALYZE TABLE
for data source V2 in Spark. This stored procedure is doing something similar to ANALYZE TABLE
. Normally after users analyze table, they will DESCRIBE
to get the statistics information. I create a view so users can query the statistics.
} | ||
|
||
query = query.substring(0, query.length() - 2) + " FROM " + tableName; | ||
Dataset<Row> df = spark().sql(query); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@RussellSpitzer, @flyrain, @huaxingao: Is it good to have a spark action first and call that action from this procedure? This way the users who use only APIs can also leverage this feature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That sounds reasonable. RewriteManifestsProcedure
did the same thing.
OutputFile outputFile = fileIO.newOutputFile(path); | ||
|
||
try (PuffinWriter writer = | ||
Puffin.write(outputFile).createdBy("Spark DistinctCountProcedure").build()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Can we move "Spark DistinctCountProcedure" to a separate constant?
I saw a new PR on the same : #10288 |
Add a Spark procedure to collect NDV, which will be used for CBO.